You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 05:45:38 UTC

[arrow-datafusion] 01/01: add datafusion-expr module

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch datafusion-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit b53e5a16dfec188d19fc69c481c4ccf29985b952
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:45:28 2022 +0800

    add datafusion-expr module
---
 Cargo.toml                                       |   1 +
 Cargo.toml => datafusion-expr/Cargo.toml         |  38 ++++---
 datafusion-expr/src/aggregate_function.rs        |  93 ++++++++++++++++
 datafusion-expr/src/lib.rs                       |  22 ++++
 datafusion-expr/src/window_function.rs           | 133 +++++++++++++++++++++++
 datafusion/Cargo.toml                            |   1 +
 datafusion/src/physical_plan/aggregates.rs       |  76 +------------
 datafusion/src/physical_plan/window_functions.rs | 119 +-------------------
 8 files changed, 277 insertions(+), 206 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 81f6bb5..f74f53c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,6 +19,7 @@
 members = [
     "datafusion",
     "datafusion-common",
+    "datafusion-expr",
     "datafusion-cli",
     "datafusion-examples",
     "benchmarks",
diff --git a/Cargo.toml b/datafusion-expr/Cargo.toml
similarity index 54%
copy from Cargo.toml
copy to datafusion-expr/Cargo.toml
index 81f6bb5..3cac735 100644
--- a/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -15,20 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = [
-    "datafusion",
-    "datafusion-common",
-    "datafusion-cli",
-    "datafusion-examples",
-    "benchmarks",
-    "ballista/rust/client",
-    "ballista/rust/core",
-    "ballista/rust/executor",
-    "ballista/rust/scheduler",
-    "ballista-examples",
-]
+[package]
+name = "datafusion-expr"
+description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model"
+version = "6.0.0"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+readme = "../README.md"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "query", "sql" ]
+publish = false
+edition = "2021"
+rust-version = "1.58"
 
-[profile.release]
-lto = true
-codegen-units = 1
+[lib]
+name = "datafusion_expr"
+path = "src/lib.rs"
+
+[features]
+
+[dependencies]
+datafusion-common = { path = "../datafusion-common" }
+arrow = { version = "8.0.0", features = ["prettyprint"] }
diff --git a/datafusion-expr/src/aggregate_function.rs b/datafusion-expr/src/aggregate_function.rs
new file mode 100644
index 0000000..8f12e88
--- /dev/null
+++ b/datafusion-expr/src/aggregate_function.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{DataFusionError, Result};
+use std::{fmt, str::FromStr};
+
+/// Enum of all built-in aggregate functions
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub enum AggregateFunction {
+    /// count
+    Count,
+    /// sum
+    Sum,
+    /// min
+    Min,
+    /// max
+    Max,
+    /// avg
+    Avg,
+    /// Approximate aggregate function
+    ApproxDistinct,
+    /// array_agg
+    ArrayAgg,
+    /// Variance (Sample)
+    Variance,
+    /// Variance (Population)
+    VariancePop,
+    /// Standard Deviation (Sample)
+    Stddev,
+    /// Standard Deviation (Population)
+    StddevPop,
+    /// Covariance (Sample)
+    Covariance,
+    /// Covariance (Population)
+    CovariancePop,
+    /// Correlation
+    Correlation,
+    /// Approximate continuous percentile function
+    ApproxPercentileCont,
+}
+
+impl fmt::Display for AggregateFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        // uppercase of the debug.
+        write!(f, "{}", format!("{:?}", self).to_uppercase())
+    }
+}
+
+impl FromStr for AggregateFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<AggregateFunction> {
+        Ok(match name {
+            "min" => AggregateFunction::Min,
+            "max" => AggregateFunction::Max,
+            "count" => AggregateFunction::Count,
+            "avg" => AggregateFunction::Avg,
+            "sum" => AggregateFunction::Sum,
+            "approx_distinct" => AggregateFunction::ApproxDistinct,
+            "array_agg" => AggregateFunction::ArrayAgg,
+            "var" => AggregateFunction::Variance,
+            "var_samp" => AggregateFunction::Variance,
+            "var_pop" => AggregateFunction::VariancePop,
+            "stddev" => AggregateFunction::Stddev,
+            "stddev_samp" => AggregateFunction::Stddev,
+            "stddev_pop" => AggregateFunction::StddevPop,
+            "covar" => AggregateFunction::Covariance,
+            "covar_samp" => AggregateFunction::Covariance,
+            "covar_pop" => AggregateFunction::CovariancePop,
+            "corr" => AggregateFunction::Correlation,
+            "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "There is no built-in function named {}",
+                    name
+                )));
+            }
+        })
+    }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
new file mode 100644
index 0000000..b6eaaf7
--- /dev/null
+++ b/datafusion-expr/src/lib.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod aggregate_function;
+mod window_function;
+
+pub use aggregate_function::AggregateFunction;
+pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/window_function.rs b/datafusion-expr/src/window_function.rs
new file mode 100644
index 0000000..2511874
--- /dev/null
+++ b/datafusion-expr/src/window_function.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregate_function::AggregateFunction;
+use datafusion_common::{DataFusionError, Result};
+use std::{fmt, str::FromStr};
+
+/// WindowFunction
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum WindowFunction {
+    /// window function that leverages an aggregate function
+    AggregateFunction(AggregateFunction),
+    /// window function that leverages a built-in window function
+    BuiltInWindowFunction(BuiltInWindowFunction),
+}
+
+impl FromStr for WindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<WindowFunction> {
+        let name = name.to_lowercase();
+        if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
+            Ok(WindowFunction::AggregateFunction(aggregate))
+        } else if let Ok(built_in_function) =
+            BuiltInWindowFunction::from_str(name.as_str())
+        {
+            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
+        } else {
+            Err(DataFusionError::Plan(format!(
+                "There is no window function named {}",
+                name
+            )))
+        }
+    }
+}
+
+impl fmt::Display for BuiltInWindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"),
+            BuiltInWindowFunction::Rank => write!(f, "RANK"),
+            BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"),
+            BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"),
+            BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"),
+            BuiltInWindowFunction::Ntile => write!(f, "NTILE"),
+            BuiltInWindowFunction::Lag => write!(f, "LAG"),
+            BuiltInWindowFunction::Lead => write!(f, "LEAD"),
+            BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"),
+            BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"),
+            BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"),
+        }
+    }
+}
+
+impl fmt::Display for WindowFunction {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        match self {
+            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
+            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
+        }
+    }
+}
+
+/// An aggregate function that is part of a built-in window function
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum BuiltInWindowFunction {
+    /// number of the current row within its partition, counting from 1
+    RowNumber,
+    /// rank of the current row with gaps; same as row_number of its first peer
+    Rank,
+    /// ank of the current row without gaps; this function counts peer groups
+    DenseRank,
+    /// relative rank of the current row: (rank - 1) / (total rows - 1)
+    PercentRank,
+    /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
+    CumeDist,
+    /// integer ranging from 1 to the argument value, dividing the partition as equally as possible
+    Ntile,
+    /// returns value evaluated at the row that is offset rows before the current row within the partition;
+    /// if there is no such row, instead return default (which must be of the same type as value).
+    /// Both offset and default are evaluated with respect to the current row.
+    /// If omitted, offset defaults to 1 and default to null
+    Lag,
+    /// returns value evaluated at the row that is offset rows after the current row within the partition;
+    /// if there is no such row, instead return default (which must be of the same type as value).
+    /// Both offset and default are evaluated with respect to the current row.
+    /// If omitted, offset defaults to 1 and default to null
+    Lead,
+    /// returns value evaluated at the row that is the first row of the window frame
+    FirstValue,
+    /// returns value evaluated at the row that is the last row of the window frame
+    LastValue,
+    /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
+    NthValue,
+}
+
+impl FromStr for BuiltInWindowFunction {
+    type Err = DataFusionError;
+    fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
+        Ok(match name.to_uppercase().as_str() {
+            "ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
+            "RANK" => BuiltInWindowFunction::Rank,
+            "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
+            "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
+            "CUME_DIST" => BuiltInWindowFunction::CumeDist,
+            "NTILE" => BuiltInWindowFunction::Ntile,
+            "LAG" => BuiltInWindowFunction::Lag,
+            "LEAD" => BuiltInWindowFunction::Lead,
+            "FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
+            "LAST_VALUE" => BuiltInWindowFunction::LastValue,
+            "NTH_VALUE" => BuiltInWindowFunction::NthValue,
+            _ => {
+                return Err(DataFusionError::Plan(format!(
+                    "There is no built-in window function named {}",
+                    name
+                )))
+            }
+        })
+    }
+}
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 180e037..51f78b4 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -51,6 +51,7 @@ avro = ["avro-rs", "num-traits", "datafusion-common/avro"]
 
 [dependencies]
 datafusion-common = { path = "../datafusion-common" }
+datafusion-expr = { path = "../datafusion-expr" }
 ahash = { version = "0.7", default-features = false }
 hashbrown = { version = "0.12", features = ["raw"] }
 arrow = { version = "8.0.0", features = ["prettyprint"] }
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index 8fc94d3..a1531d4 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -38,7 +38,7 @@ use expressions::{
     avg_return_type, correlation_return_type, covariance_return_type, stddev_return_type,
     sum_return_type, variance_return_type,
 };
-use std::{fmt, str::FromStr, sync::Arc};
+use std::sync::Arc;
 
 /// the implementation of an aggregate function
 pub type AccumulatorFunctionImplementation =
@@ -49,79 +49,7 @@ pub type AccumulatorFunctionImplementation =
 pub type StateTypeFunction =
     Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
 
-/// Enum of all built-in aggregate functions
-#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
-pub enum AggregateFunction {
-    /// count
-    Count,
-    /// sum
-    Sum,
-    /// min
-    Min,
-    /// max
-    Max,
-    /// avg
-    Avg,
-    /// Approximate aggregate function
-    ApproxDistinct,
-    /// array_agg
-    ArrayAgg,
-    /// Variance (Sample)
-    Variance,
-    /// Variance (Population)
-    VariancePop,
-    /// Standard Deviation (Sample)
-    Stddev,
-    /// Standard Deviation (Population)
-    StddevPop,
-    /// Covariance (Sample)
-    Covariance,
-    /// Covariance (Population)
-    CovariancePop,
-    /// Correlation
-    Correlation,
-    /// Approximate continuous percentile function
-    ApproxPercentileCont,
-}
-
-impl fmt::Display for AggregateFunction {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        // uppercase of the debug.
-        write!(f, "{}", format!("{:?}", self).to_uppercase())
-    }
-}
-
-impl FromStr for AggregateFunction {
-    type Err = DataFusionError;
-    fn from_str(name: &str) -> Result<AggregateFunction> {
-        Ok(match name {
-            "min" => AggregateFunction::Min,
-            "max" => AggregateFunction::Max,
-            "count" => AggregateFunction::Count,
-            "avg" => AggregateFunction::Avg,
-            "sum" => AggregateFunction::Sum,
-            "approx_distinct" => AggregateFunction::ApproxDistinct,
-            "array_agg" => AggregateFunction::ArrayAgg,
-            "var" => AggregateFunction::Variance,
-            "var_samp" => AggregateFunction::Variance,
-            "var_pop" => AggregateFunction::VariancePop,
-            "stddev" => AggregateFunction::Stddev,
-            "stddev_samp" => AggregateFunction::Stddev,
-            "stddev_pop" => AggregateFunction::StddevPop,
-            "covar" => AggregateFunction::Covariance,
-            "covar_samp" => AggregateFunction::Covariance,
-            "covar_pop" => AggregateFunction::CovariancePop,
-            "corr" => AggregateFunction::Correlation,
-            "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
-            _ => {
-                return Err(DataFusionError::Plan(format!(
-                    "There is no built-in function named {}",
-                    name
-                )));
-            }
-        })
-    }
-}
+pub use datafusion_expr::AggregateFunction;
 
 /// Returns the datatype of the aggregate function.
 /// This is used to get the returned data type for aggregate expr.
diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs
index 178a55a..1dcac3f 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -23,130 +23,17 @@
 use crate::error::{DataFusionError, Result};
 use crate::physical_plan::functions::{TypeSignature, Volatility};
 use crate::physical_plan::{
-    aggregates, aggregates::AggregateFunction, functions::Signature,
-    type_coercion::data_types, windows::find_ranges_in_range, PhysicalExpr,
+    aggregates, functions::Signature, type_coercion::data_types,
+    windows::find_ranges_in_range, PhysicalExpr,
 };
 use arrow::array::ArrayRef;
 use arrow::datatypes::DataType;
 use arrow::datatypes::Field;
 use arrow::record_batch::RecordBatch;
+pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction};
 use std::any::Any;
 use std::ops::Range;
 use std::sync::Arc;
-use std::{fmt, str::FromStr};
-
-/// WindowFunction
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum WindowFunction {
-    /// window function that leverages an aggregate function
-    AggregateFunction(AggregateFunction),
-    /// window function that leverages a built-in window function
-    BuiltInWindowFunction(BuiltInWindowFunction),
-}
-
-impl FromStr for WindowFunction {
-    type Err = DataFusionError;
-    fn from_str(name: &str) -> Result<WindowFunction> {
-        let name = name.to_lowercase();
-        if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
-            Ok(WindowFunction::AggregateFunction(aggregate))
-        } else if let Ok(built_in_function) =
-            BuiltInWindowFunction::from_str(name.as_str())
-        {
-            Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
-        } else {
-            Err(DataFusionError::Plan(format!(
-                "There is no window function named {}",
-                name
-            )))
-        }
-    }
-}
-
-impl fmt::Display for BuiltInWindowFunction {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"),
-            BuiltInWindowFunction::Rank => write!(f, "RANK"),
-            BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"),
-            BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"),
-            BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"),
-            BuiltInWindowFunction::Ntile => write!(f, "NTILE"),
-            BuiltInWindowFunction::Lag => write!(f, "LAG"),
-            BuiltInWindowFunction::Lead => write!(f, "LEAD"),
-            BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"),
-            BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"),
-            BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"),
-        }
-    }
-}
-
-impl fmt::Display for WindowFunction {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        match self {
-            WindowFunction::AggregateFunction(fun) => fun.fmt(f),
-            WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
-        }
-    }
-}
-
-/// An aggregate function that is part of a built-in window function
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum BuiltInWindowFunction {
-    /// number of the current row within its partition, counting from 1
-    RowNumber,
-    /// rank of the current row with gaps; same as row_number of its first peer
-    Rank,
-    /// ank of the current row without gaps; this function counts peer groups
-    DenseRank,
-    /// relative rank of the current row: (rank - 1) / (total rows - 1)
-    PercentRank,
-    /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
-    CumeDist,
-    /// integer ranging from 1 to the argument value, dividing the partition as equally as possible
-    Ntile,
-    /// returns value evaluated at the row that is offset rows before the current row within the partition;
-    /// if there is no such row, instead return default (which must be of the same type as value).
-    /// Both offset and default are evaluated with respect to the current row.
-    /// If omitted, offset defaults to 1 and default to null
-    Lag,
-    /// returns value evaluated at the row that is offset rows after the current row within the partition;
-    /// if there is no such row, instead return default (which must be of the same type as value).
-    /// Both offset and default are evaluated with respect to the current row.
-    /// If omitted, offset defaults to 1 and default to null
-    Lead,
-    /// returns value evaluated at the row that is the first row of the window frame
-    FirstValue,
-    /// returns value evaluated at the row that is the last row of the window frame
-    LastValue,
-    /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
-    NthValue,
-}
-
-impl FromStr for BuiltInWindowFunction {
-    type Err = DataFusionError;
-    fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
-        Ok(match name.to_uppercase().as_str() {
-            "ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
-            "RANK" => BuiltInWindowFunction::Rank,
-            "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
-            "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
-            "CUME_DIST" => BuiltInWindowFunction::CumeDist,
-            "NTILE" => BuiltInWindowFunction::Ntile,
-            "LAG" => BuiltInWindowFunction::Lag,
-            "LEAD" => BuiltInWindowFunction::Lead,
-            "FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
-            "LAST_VALUE" => BuiltInWindowFunction::LastValue,
-            "NTH_VALUE" => BuiltInWindowFunction::NthValue,
-            _ => {
-                return Err(DataFusionError::Plan(format!(
-                    "There is no built-in window function named {}",
-                    name
-                )))
-            }
-        })
-    }
-}
 
 /// Returns the datatype of the window function
 pub fn return_type(