You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/06 05:45:38 UTC
[arrow-datafusion] 01/01: add datafusion-expr module
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch datafusion-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit b53e5a16dfec188d19fc69c481c4ccf29985b952
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sun Feb 6 13:45:28 2022 +0800
add datafusion-expr module
---
Cargo.toml | 1 +
Cargo.toml => datafusion-expr/Cargo.toml | 38 ++++---
datafusion-expr/src/aggregate_function.rs | 93 ++++++++++++++++
datafusion-expr/src/lib.rs | 22 ++++
datafusion-expr/src/window_function.rs | 133 +++++++++++++++++++++++
datafusion/Cargo.toml | 1 +
datafusion/src/physical_plan/aggregates.rs | 76 +------------
datafusion/src/physical_plan/window_functions.rs | 119 +-------------------
8 files changed, 277 insertions(+), 206 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 81f6bb5..f74f53c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,6 +19,7 @@
members = [
"datafusion",
"datafusion-common",
+ "datafusion-expr",
"datafusion-cli",
"datafusion-examples",
"benchmarks",
diff --git a/Cargo.toml b/datafusion-expr/Cargo.toml
similarity index 54%
copy from Cargo.toml
copy to datafusion-expr/Cargo.toml
index 81f6bb5..3cac735 100644
--- a/Cargo.toml
+++ b/datafusion-expr/Cargo.toml
@@ -15,20 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-members = [
- "datafusion",
- "datafusion-common",
- "datafusion-cli",
- "datafusion-examples",
- "benchmarks",
- "ballista/rust/client",
- "ballista/rust/core",
- "ballista/rust/executor",
- "ballista/rust/scheduler",
- "ballista-examples",
-]
+[package]
+name = "datafusion-expr"
+description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model"
+version = "6.0.0"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+readme = "../README.md"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "query", "sql" ]
+publish = false
+edition = "2021"
+rust-version = "1.58"
-[profile.release]
-lto = true
-codegen-units = 1
+[lib]
+name = "datafusion_expr"
+path = "src/lib.rs"
+
+[features]
+
+[dependencies]
+datafusion-common = { path = "../datafusion-common" }
+arrow = { version = "8.0.0", features = ["prettyprint"] }
diff --git a/datafusion-expr/src/aggregate_function.rs b/datafusion-expr/src/aggregate_function.rs
new file mode 100644
index 0000000..8f12e88
--- /dev/null
+++ b/datafusion-expr/src/aggregate_function.rs
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{DataFusionError, Result};
+use std::{fmt, str::FromStr};
+
+/// Enum of all built-in aggregate functions
+#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
+pub enum AggregateFunction {
+ /// count
+ Count,
+ /// sum
+ Sum,
+ /// min
+ Min,
+ /// max
+ Max,
+ /// avg
+ Avg,
+ /// Approximate aggregate function
+ ApproxDistinct,
+ /// array_agg
+ ArrayAgg,
+ /// Variance (Sample)
+ Variance,
+ /// Variance (Population)
+ VariancePop,
+ /// Standard Deviation (Sample)
+ Stddev,
+ /// Standard Deviation (Population)
+ StddevPop,
+ /// Covariance (Sample)
+ Covariance,
+ /// Covariance (Population)
+ CovariancePop,
+ /// Correlation
+ Correlation,
+ /// Approximate continuous percentile function
+ ApproxPercentileCont,
+}
+
+impl fmt::Display for AggregateFunction {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ // uppercase of the debug.
+ write!(f, "{}", format!("{:?}", self).to_uppercase())
+ }
+}
+
+impl FromStr for AggregateFunction {
+ type Err = DataFusionError;
+ fn from_str(name: &str) -> Result<AggregateFunction> {
+ Ok(match name {
+ "min" => AggregateFunction::Min,
+ "max" => AggregateFunction::Max,
+ "count" => AggregateFunction::Count,
+ "avg" => AggregateFunction::Avg,
+ "sum" => AggregateFunction::Sum,
+ "approx_distinct" => AggregateFunction::ApproxDistinct,
+ "array_agg" => AggregateFunction::ArrayAgg,
+ "var" => AggregateFunction::Variance,
+ "var_samp" => AggregateFunction::Variance,
+ "var_pop" => AggregateFunction::VariancePop,
+ "stddev" => AggregateFunction::Stddev,
+ "stddev_samp" => AggregateFunction::Stddev,
+ "stddev_pop" => AggregateFunction::StddevPop,
+ "covar" => AggregateFunction::Covariance,
+ "covar_samp" => AggregateFunction::Covariance,
+ "covar_pop" => AggregateFunction::CovariancePop,
+ "corr" => AggregateFunction::Correlation,
+ "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
+ _ => {
+ return Err(DataFusionError::Plan(format!(
+ "There is no built-in function named {}",
+ name
+ )));
+ }
+ })
+ }
+}
diff --git a/datafusion-expr/src/lib.rs b/datafusion-expr/src/lib.rs
new file mode 100644
index 0000000..b6eaaf7
--- /dev/null
+++ b/datafusion-expr/src/lib.rs
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod aggregate_function;
+mod window_function;
+
+pub use aggregate_function::AggregateFunction;
+pub use window_function::{BuiltInWindowFunction, WindowFunction};
diff --git a/datafusion-expr/src/window_function.rs b/datafusion-expr/src/window_function.rs
new file mode 100644
index 0000000..2511874
--- /dev/null
+++ b/datafusion-expr/src/window_function.rs
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aggregate_function::AggregateFunction;
+use datafusion_common::{DataFusionError, Result};
+use std::{fmt, str::FromStr};
+
+/// WindowFunction
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum WindowFunction {
+ /// window function that leverages an aggregate function
+ AggregateFunction(AggregateFunction),
+ /// window function that leverages a built-in window function
+ BuiltInWindowFunction(BuiltInWindowFunction),
+}
+
+impl FromStr for WindowFunction {
+ type Err = DataFusionError;
+ fn from_str(name: &str) -> Result<WindowFunction> {
+ let name = name.to_lowercase();
+ if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
+ Ok(WindowFunction::AggregateFunction(aggregate))
+ } else if let Ok(built_in_function) =
+ BuiltInWindowFunction::from_str(name.as_str())
+ {
+ Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
+ } else {
+ Err(DataFusionError::Plan(format!(
+ "There is no window function named {}",
+ name
+ )))
+ }
+ }
+}
+
+impl fmt::Display for BuiltInWindowFunction {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"),
+ BuiltInWindowFunction::Rank => write!(f, "RANK"),
+ BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"),
+ BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"),
+ BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"),
+ BuiltInWindowFunction::Ntile => write!(f, "NTILE"),
+ BuiltInWindowFunction::Lag => write!(f, "LAG"),
+ BuiltInWindowFunction::Lead => write!(f, "LEAD"),
+ BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"),
+ BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"),
+ BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"),
+ }
+ }
+}
+
+impl fmt::Display for WindowFunction {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ WindowFunction::AggregateFunction(fun) => fun.fmt(f),
+ WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
+ }
+ }
+}
+
+/// An aggregate function that is part of a built-in window function
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub enum BuiltInWindowFunction {
+ /// number of the current row within its partition, counting from 1
+ RowNumber,
+ /// rank of the current row with gaps; same as row_number of its first peer
+ Rank,
+ /// ank of the current row without gaps; this function counts peer groups
+ DenseRank,
+ /// relative rank of the current row: (rank - 1) / (total rows - 1)
+ PercentRank,
+ /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
+ CumeDist,
+ /// integer ranging from 1 to the argument value, dividing the partition as equally as possible
+ Ntile,
+ /// returns value evaluated at the row that is offset rows before the current row within the partition;
+ /// if there is no such row, instead return default (which must be of the same type as value).
+ /// Both offset and default are evaluated with respect to the current row.
+ /// If omitted, offset defaults to 1 and default to null
+ Lag,
+ /// returns value evaluated at the row that is offset rows after the current row within the partition;
+ /// if there is no such row, instead return default (which must be of the same type as value).
+ /// Both offset and default are evaluated with respect to the current row.
+ /// If omitted, offset defaults to 1 and default to null
+ Lead,
+ /// returns value evaluated at the row that is the first row of the window frame
+ FirstValue,
+ /// returns value evaluated at the row that is the last row of the window frame
+ LastValue,
+ /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
+ NthValue,
+}
+
+impl FromStr for BuiltInWindowFunction {
+ type Err = DataFusionError;
+ fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
+ Ok(match name.to_uppercase().as_str() {
+ "ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
+ "RANK" => BuiltInWindowFunction::Rank,
+ "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
+ "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
+ "CUME_DIST" => BuiltInWindowFunction::CumeDist,
+ "NTILE" => BuiltInWindowFunction::Ntile,
+ "LAG" => BuiltInWindowFunction::Lag,
+ "LEAD" => BuiltInWindowFunction::Lead,
+ "FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
+ "LAST_VALUE" => BuiltInWindowFunction::LastValue,
+ "NTH_VALUE" => BuiltInWindowFunction::NthValue,
+ _ => {
+ return Err(DataFusionError::Plan(format!(
+ "There is no built-in window function named {}",
+ name
+ )))
+ }
+ })
+ }
+}
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 180e037..51f78b4 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -51,6 +51,7 @@ avro = ["avro-rs", "num-traits", "datafusion-common/avro"]
[dependencies]
datafusion-common = { path = "../datafusion-common" }
+datafusion-expr = { path = "../datafusion-expr" }
ahash = { version = "0.7", default-features = false }
hashbrown = { version = "0.12", features = ["raw"] }
arrow = { version = "8.0.0", features = ["prettyprint"] }
diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs
index 8fc94d3..a1531d4 100644
--- a/datafusion/src/physical_plan/aggregates.rs
+++ b/datafusion/src/physical_plan/aggregates.rs
@@ -38,7 +38,7 @@ use expressions::{
avg_return_type, correlation_return_type, covariance_return_type, stddev_return_type,
sum_return_type, variance_return_type,
};
-use std::{fmt, str::FromStr, sync::Arc};
+use std::sync::Arc;
/// the implementation of an aggregate function
pub type AccumulatorFunctionImplementation =
@@ -49,79 +49,7 @@ pub type AccumulatorFunctionImplementation =
pub type StateTypeFunction =
Arc<dyn Fn(&DataType) -> Result<Arc<Vec<DataType>>> + Send + Sync>;
-/// Enum of all built-in aggregate functions
-#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
-pub enum AggregateFunction {
- /// count
- Count,
- /// sum
- Sum,
- /// min
- Min,
- /// max
- Max,
- /// avg
- Avg,
- /// Approximate aggregate function
- ApproxDistinct,
- /// array_agg
- ArrayAgg,
- /// Variance (Sample)
- Variance,
- /// Variance (Population)
- VariancePop,
- /// Standard Deviation (Sample)
- Stddev,
- /// Standard Deviation (Population)
- StddevPop,
- /// Covariance (Sample)
- Covariance,
- /// Covariance (Population)
- CovariancePop,
- /// Correlation
- Correlation,
- /// Approximate continuous percentile function
- ApproxPercentileCont,
-}
-
-impl fmt::Display for AggregateFunction {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- // uppercase of the debug.
- write!(f, "{}", format!("{:?}", self).to_uppercase())
- }
-}
-
-impl FromStr for AggregateFunction {
- type Err = DataFusionError;
- fn from_str(name: &str) -> Result<AggregateFunction> {
- Ok(match name {
- "min" => AggregateFunction::Min,
- "max" => AggregateFunction::Max,
- "count" => AggregateFunction::Count,
- "avg" => AggregateFunction::Avg,
- "sum" => AggregateFunction::Sum,
- "approx_distinct" => AggregateFunction::ApproxDistinct,
- "array_agg" => AggregateFunction::ArrayAgg,
- "var" => AggregateFunction::Variance,
- "var_samp" => AggregateFunction::Variance,
- "var_pop" => AggregateFunction::VariancePop,
- "stddev" => AggregateFunction::Stddev,
- "stddev_samp" => AggregateFunction::Stddev,
- "stddev_pop" => AggregateFunction::StddevPop,
- "covar" => AggregateFunction::Covariance,
- "covar_samp" => AggregateFunction::Covariance,
- "covar_pop" => AggregateFunction::CovariancePop,
- "corr" => AggregateFunction::Correlation,
- "approx_percentile_cont" => AggregateFunction::ApproxPercentileCont,
- _ => {
- return Err(DataFusionError::Plan(format!(
- "There is no built-in function named {}",
- name
- )));
- }
- })
- }
-}
+pub use datafusion_expr::AggregateFunction;
/// Returns the datatype of the aggregate function.
/// This is used to get the returned data type for aggregate expr.
diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs
index 178a55a..1dcac3f 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -23,130 +23,17 @@
use crate::error::{DataFusionError, Result};
use crate::physical_plan::functions::{TypeSignature, Volatility};
use crate::physical_plan::{
- aggregates, aggregates::AggregateFunction, functions::Signature,
- type_coercion::data_types, windows::find_ranges_in_range, PhysicalExpr,
+ aggregates, functions::Signature, type_coercion::data_types,
+ windows::find_ranges_in_range, PhysicalExpr,
};
use arrow::array::ArrayRef;
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
+pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction};
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
-use std::{fmt, str::FromStr};
-
-/// WindowFunction
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum WindowFunction {
- /// window function that leverages an aggregate function
- AggregateFunction(AggregateFunction),
- /// window function that leverages a built-in window function
- BuiltInWindowFunction(BuiltInWindowFunction),
-}
-
-impl FromStr for WindowFunction {
- type Err = DataFusionError;
- fn from_str(name: &str) -> Result<WindowFunction> {
- let name = name.to_lowercase();
- if let Ok(aggregate) = AggregateFunction::from_str(name.as_str()) {
- Ok(WindowFunction::AggregateFunction(aggregate))
- } else if let Ok(built_in_function) =
- BuiltInWindowFunction::from_str(name.as_str())
- {
- Ok(WindowFunction::BuiltInWindowFunction(built_in_function))
- } else {
- Err(DataFusionError::Plan(format!(
- "There is no window function named {}",
- name
- )))
- }
- }
-}
-
-impl fmt::Display for BuiltInWindowFunction {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- BuiltInWindowFunction::RowNumber => write!(f, "ROW_NUMBER"),
- BuiltInWindowFunction::Rank => write!(f, "RANK"),
- BuiltInWindowFunction::DenseRank => write!(f, "DENSE_RANK"),
- BuiltInWindowFunction::PercentRank => write!(f, "PERCENT_RANK"),
- BuiltInWindowFunction::CumeDist => write!(f, "CUME_DIST"),
- BuiltInWindowFunction::Ntile => write!(f, "NTILE"),
- BuiltInWindowFunction::Lag => write!(f, "LAG"),
- BuiltInWindowFunction::Lead => write!(f, "LEAD"),
- BuiltInWindowFunction::FirstValue => write!(f, "FIRST_VALUE"),
- BuiltInWindowFunction::LastValue => write!(f, "LAST_VALUE"),
- BuiltInWindowFunction::NthValue => write!(f, "NTH_VALUE"),
- }
- }
-}
-
-impl fmt::Display for WindowFunction {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- WindowFunction::AggregateFunction(fun) => fun.fmt(f),
- WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
- }
- }
-}
-
-/// An aggregate function that is part of a built-in window function
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub enum BuiltInWindowFunction {
- /// number of the current row within its partition, counting from 1
- RowNumber,
- /// rank of the current row with gaps; same as row_number of its first peer
- Rank,
- /// ank of the current row without gaps; this function counts peer groups
- DenseRank,
- /// relative rank of the current row: (rank - 1) / (total rows - 1)
- PercentRank,
- /// relative rank of the current row: (number of rows preceding or peer with current row) / (total rows)
- CumeDist,
- /// integer ranging from 1 to the argument value, dividing the partition as equally as possible
- Ntile,
- /// returns value evaluated at the row that is offset rows before the current row within the partition;
- /// if there is no such row, instead return default (which must be of the same type as value).
- /// Both offset and default are evaluated with respect to the current row.
- /// If omitted, offset defaults to 1 and default to null
- Lag,
- /// returns value evaluated at the row that is offset rows after the current row within the partition;
- /// if there is no such row, instead return default (which must be of the same type as value).
- /// Both offset and default are evaluated with respect to the current row.
- /// If omitted, offset defaults to 1 and default to null
- Lead,
- /// returns value evaluated at the row that is the first row of the window frame
- FirstValue,
- /// returns value evaluated at the row that is the last row of the window frame
- LastValue,
- /// returns value evaluated at the row that is the nth row of the window frame (counting from 1); null if no such row
- NthValue,
-}
-
-impl FromStr for BuiltInWindowFunction {
- type Err = DataFusionError;
- fn from_str(name: &str) -> Result<BuiltInWindowFunction> {
- Ok(match name.to_uppercase().as_str() {
- "ROW_NUMBER" => BuiltInWindowFunction::RowNumber,
- "RANK" => BuiltInWindowFunction::Rank,
- "DENSE_RANK" => BuiltInWindowFunction::DenseRank,
- "PERCENT_RANK" => BuiltInWindowFunction::PercentRank,
- "CUME_DIST" => BuiltInWindowFunction::CumeDist,
- "NTILE" => BuiltInWindowFunction::Ntile,
- "LAG" => BuiltInWindowFunction::Lag,
- "LEAD" => BuiltInWindowFunction::Lead,
- "FIRST_VALUE" => BuiltInWindowFunction::FirstValue,
- "LAST_VALUE" => BuiltInWindowFunction::LastValue,
- "NTH_VALUE" => BuiltInWindowFunction::NthValue,
- _ => {
- return Err(DataFusionError::Plan(format!(
- "There is no built-in window function named {}",
- name
- )))
- }
- })
- }
-}
/// Returns the datatype of the window function
pub fn return_type(