You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/16 15:33:39 UTC
[arrow-datafusion] 01/02: physical sort crate
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch physical-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 8009f38e5f00c25ced75371a5e0d73c792765059
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Wed Feb 16 23:29:44 2022 +0800
physical sort crate
---
Cargo.toml | 1 +
Cargo.toml => datafusion-physical-expr/Cargo.toml | 39 ++++---
datafusion-physical-expr/README.md | 24 ++++
datafusion-physical-expr/src/aggregate_expr.rs | 61 ++++++++++
datafusion-physical-expr/src/lib.rs | 26 +++++
datafusion-physical-expr/src/physical_expr.rs | 42 +++++++
datafusion-physical-expr/src/sort_expr.rs | 65 +++++++++++
datafusion-physical-expr/src/window_expr.rs | 121 ++++++++++++++++++++
datafusion/Cargo.toml | 1 +
datafusion/src/physical_plan/expressions/mod.rs | 42 +------
datafusion/src/physical_plan/mod.rs | 133 +---------------------
11 files changed, 365 insertions(+), 190 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index f74f53c..beaa22d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,6 +20,7 @@ members = [
"datafusion",
"datafusion-common",
"datafusion-expr",
+ "datafusion-physical-expr",
"datafusion-cli",
"datafusion-examples",
"benchmarks",
diff --git a/Cargo.toml b/datafusion-physical-expr/Cargo.toml
similarity index 51%
copy from Cargo.toml
copy to datafusion-physical-expr/Cargo.toml
index f74f53c..ba12b58 100644
--- a/Cargo.toml
+++ b/datafusion-physical-expr/Cargo.toml
@@ -15,21 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-members = [
- "datafusion",
- "datafusion-common",
- "datafusion-expr",
- "datafusion-cli",
- "datafusion-examples",
- "benchmarks",
- "ballista/rust/client",
- "ballista/rust/core",
- "ballista/rust/executor",
- "ballista/rust/scheduler",
- "ballista-examples",
-]
+[package]
+name = "datafusion-physical-expr"
+description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model"
+version = "7.0.0"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+readme = "../README.md"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "query", "sql" ]
+edition = "2021"
+rust-version = "1.58"
-[profile.release]
-lto = true
-codegen-units = 1
+[lib]
+name = "datafusion_physical_expr"
+path = "src/lib.rs"
+
+[features]
+
+[dependencies]
+datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
+datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
+arrow = { version = "9.0.0", features = ["prettyprint"] }
diff --git a/datafusion-physical-expr/README.md b/datafusion-physical-expr/README.md
new file mode 100644
index 0000000..9c92023
--- /dev/null
+++ b/datafusion-physical-expr/README.md
@@ -0,0 +1,24 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion Physical Expr
+
+This is an internal module for fundamental physical expression types of [DataFusion][df].
+
+[df]: https://crates.io/crates/datafusion
diff --git a/datafusion-physical-expr/src/aggregate_expr.rs b/datafusion-physical-expr/src/aggregate_expr.rs
new file mode 100644
index 0000000..c6ff312
--- /dev/null
+++ b/datafusion-physical-expr/src/aggregate_expr.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalExpr;
+use arrow::compute::kernels::partition::lexicographical_partition_ranges;
+use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::error::Result as ArrowResult;
+use arrow::{array::ArrayRef, datatypes::Field};
+use datafusion_common::Result;
+use datafusion_expr::Accumulator;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::Arc;
+use std::{any::Any, pin::Pin};
+
+/// An aggregate expression that:
+/// * knows its resulting field
+/// * knows how to create its accumulator
+/// * knows its accumulator's state's field
+/// * knows the expressions from whose its accumulator will receive values
+pub trait AggregateExpr: Send + Sync + Debug {
+ /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+
+ /// the field of the final result of this aggregation.
+ fn field(&self) -> Result<Field>;
+
+ /// the accumulator used to accumulate values from the expressions.
+ /// the accumulator expects the same number of arguments as `expressions` and must
+ /// return states with the same description as `state_fields`
+ fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
+
+ /// the fields that encapsulate the Accumulator's state
+ /// the number of fields here equals the number of states that the accumulator contains
+ fn state_fields(&self) -> Result<Vec<Field>>;
+
+ /// expressions that are passed to the Accumulator.
+ /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+ /// Human readable name such as `"MIN(c2)"`. The default
+ /// implementation returns placeholder text.
+ fn name(&self) -> &str {
+ "AggregateExpr: default name"
+ }
+}
diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/lib.rs
new file mode 100644
index 0000000..63edaa5
--- /dev/null
+++ b/datafusion-physical-expr/src/lib.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod aggregate_expr;
+mod physical_expr;
+mod sort_expr;
+mod window_expr;
+
+pub use aggregate_expr::AggregateExpr;
+pub use physical_expr::PhysicalExpr;
+pub use sort_expr::PhysicalSortExpr;
+pub use window_expr::WindowExpr;
diff --git a/datafusion-physical-expr/src/physical_expr.rs b/datafusion-physical-expr/src/physical_expr.rs
new file mode 100644
index 0000000..c9ab578
--- /dev/null
+++ b/datafusion-physical-expr/src/physical_expr.rs
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use arrow::{array::ArrayRef, datatypes::Field};
+use datafusion_common::Result;
+use datafusion_expr::Accumulator;
+use datafusion_expr::ColumnarValue;
+use std::fmt::{Debug, Display};
+use std::sync::Arc;
+use std::{any::Any, pin::Pin};
+
+/// Expression that can be evaluated against a RecordBatch
+/// A Physical expression knows its type, nullability and how to evaluate itself.
+pub trait PhysicalExpr: Send + Sync + Display + Debug {
+ /// Returns the physical expression as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+ /// Get the data type of this expression, given the schema of the input
+ fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
+ /// Determine whether this expression is nullable, given the schema of the input
+ fn nullable(&self, input_schema: &Schema) -> Result<bool>;
+ /// Evaluate an expression against a RecordBatch
+ fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+}
diff --git a/datafusion-physical-expr/src/sort_expr.rs b/datafusion-physical-expr/src/sort_expr.rs
new file mode 100644
index 0000000..58f77de
--- /dev/null
+++ b/datafusion-physical-expr/src/sort_expr.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalExpr;
+use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::ColumnarValue;
+use std::sync::Arc;
+
+/// Represents Sort operation for a column in a RecordBatch
+#[derive(Clone, Debug)]
+pub struct PhysicalSortExpr {
+ /// Physical expression representing the column to sort
+ pub expr: Arc<dyn PhysicalExpr>,
+ /// Option to specify how the given column should be sorted
+ pub options: SortOptions,
+}
+
+impl std::fmt::Display for PhysicalSortExpr {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let opts_string = match (self.options.descending, self.options.nulls_first) {
+ (true, true) => "DESC",
+ (true, false) => "DESC NULLS LAST",
+ (false, true) => "ASC",
+ (false, false) => "ASC NULLS LAST",
+ };
+
+ write!(f, "{} {}", self.expr, opts_string)
+ }
+}
+
+impl PhysicalSortExpr {
+ /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel
+ pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
+ let value_to_sort = self.expr.evaluate(batch)?;
+ let array_to_sort = match value_to_sort {
+ ColumnarValue::Array(array) => array,
+ ColumnarValue::Scalar(scalar) => {
+ return Err(DataFusionError::Plan(format!(
+ "Sort operation is not applicable to scalar value {}",
+ scalar
+ )));
+ }
+ };
+ Ok(SortColumn {
+ values: array_to_sort,
+ options: Some(self.options),
+ })
+ }
+}
diff --git a/datafusion-physical-expr/src/window_expr.rs b/datafusion-physical-expr/src/window_expr.rs
new file mode 100644
index 0000000..4d14aee
--- /dev/null
+++ b/datafusion-physical-expr/src/window_expr.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PhysicalExpr, PhysicalSortExpr};
+use arrow::compute::kernels::partition::lexicographical_partition_ranges;
+use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::datatypes::{DataType, Schema, SchemaRef};
+use arrow::error::Result as ArrowResult;
+use arrow::record_batch::RecordBatch;
+use arrow::{array::ArrayRef, datatypes::Field};
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::Accumulator;
+use datafusion_expr::ColumnarValue;
+use std::fmt::{Debug, Display};
+use std::ops::Range;
+use std::sync::Arc;
+use std::{any::Any, pin::Pin};
+
+/// A window expression that:
+/// * knows its resulting field
+pub trait WindowExpr: Send + Sync + Debug {
+ /// Returns the window expression as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+
+ /// the field of the final result of this window function.
+ fn field(&self) -> Result<Field>;
+
+ /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
+ /// implementation returns placeholder text.
+ fn name(&self) -> &str {
+ "WindowExpr: default name"
+ }
+
+ /// expressions that are passed to the WindowAccumulator.
+ /// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`],
+ /// others (e.g. `cov`) return many.
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+ /// evaluate the window function arguments against the batch and return
+ /// array ref, normally the resulting vec is a single element one.
+ fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
+ self
+ .expressions()
+ .iter()
+ .map(|e| e.evaluate(batch))
+ .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .collect()
+ }
+
+ /// evaluate the window function values against the batch
+ fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
+
+ /// evaluate the partition points given the sort columns; if the sort columns are
+ /// empty then the result will be a single element vec of the whole column rows.
+ fn evaluate_partition_points(
+ &self,
+ num_rows: usize,
+ partition_columns: &[SortColumn],
+ ) -> Result<Vec<Range<usize>>> {
+ if partition_columns.is_empty() {
+ Ok(vec![Range {
+ start: 0,
+ end: num_rows,
+ }])
+ } else {
+ Ok(
+ lexicographical_partition_ranges(partition_columns)
+ .map_err(DataFusionError::ArrowError)?
+ .collect::<Vec<_>>(),
+ )
+ }
+ }
+
+ /// expressions that's from the window function's partition by clause, empty if absent
+ fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
+
+ /// expressions that's from the window function's order by clause, empty if absent
+ fn order_by(&self) -> &[PhysicalSortExpr];
+
+ /// get partition columns that can be used for partitioning, empty if absent
+ fn partition_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
+ self
+ .partition_by()
+ .iter()
+ .map(|expr| {
+ PhysicalSortExpr {
+ expr: expr.clone(),
+ options: SortOptions::default(),
+ }
+ .evaluate_to_sort_column(batch)
+ })
+ .collect()
+ }
+
+ /// get sort columns that can be used for peer evaluation, empty if absent
+ fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
+ let mut sort_columns = self.partition_columns(batch)?;
+ let order_by_columns = self
+ .order_by()
+ .iter()
+ .map(|e| e.evaluate_to_sort_column(batch))
+ .collect::<Result<Vec<SortColumn>>>()?;
+ sort_columns.extend(order_by_columns);
+ Ok(sort_columns)
+ }
+}
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 7da50ad..cbba899 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -54,6 +54,7 @@ row = []
[dependencies]
datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
+datafusion-physical-expr = { path = "../datafusion-physical-expr", version = "7.0.0" }
ahash = { version = "0.7", default-features = false }
hashbrown = { version = "0.12", features = ["raw"] }
arrow = { version = "9.0.0", features = ["prettyprint"] }
diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs
index 06afe00..c4a3c3e 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -118,47 +118,7 @@ pub fn format_state_name(name: &str, state_name: &str) -> String {
format!("{}[{}]", name, state_name)
}
-/// Represents Sort operation for a column in a RecordBatch
-#[derive(Clone, Debug)]
-pub struct PhysicalSortExpr {
- /// Physical expression representing the column to sort
- pub expr: Arc<dyn PhysicalExpr>,
- /// Option to specify how the given column should be sorted
- pub options: SortOptions,
-}
-
-impl std::fmt::Display for PhysicalSortExpr {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- let opts_string = match (self.options.descending, self.options.nulls_first) {
- (true, true) => "DESC",
- (true, false) => "DESC NULLS LAST",
- (false, true) => "ASC",
- (false, false) => "ASC NULLS LAST",
- };
-
- write!(f, "{} {}", self.expr, opts_string)
- }
-}
-
-impl PhysicalSortExpr {
- /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel
- pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
- let value_to_sort = self.expr.evaluate(batch)?;
- let array_to_sort = match value_to_sort {
- ColumnarValue::Array(array) => array,
- ColumnarValue::Scalar(scalar) => {
- return Err(DataFusionError::Plan(format!(
- "Sort operation is not applicable to scalar value {}",
- scalar
- )));
- }
- };
- Ok(SortColumn {
- values: array_to_sort,
- options: Some(self.options),
- })
- }
-}
+pub use datafusion_physical_expr::PhysicalSortExpr;
#[cfg(test)]
mod tests {
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index cb9ae64..bed4bb3 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -474,138 +474,7 @@ pub enum Distribution {
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
-/// Expression that can be evaluated against a RecordBatch
-/// A Physical expression knows its type, nullability and how to evaluate itself.
-pub trait PhysicalExpr: Send + Sync + Display + Debug {
- /// Returns the physical expression as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
- /// Get the data type of this expression, given the schema of the input
- fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
- /// Determine whether this expression is nullable, given the schema of the input
- fn nullable(&self, input_schema: &Schema) -> Result<bool>;
- /// Evaluate an expression against a RecordBatch
- fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
-}
-
-/// An aggregate expression that:
-/// * knows its resulting field
-/// * knows how to create its accumulator
-/// * knows its accumulator's state's field
-/// * knows the expressions from whose its accumulator will receive values
-pub trait AggregateExpr: Send + Sync + Debug {
- /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-
- /// the field of the final result of this aggregation.
- fn field(&self) -> Result<Field>;
-
- /// the accumulator used to accumulate values from the expressions.
- /// the accumulator expects the same number of arguments as `expressions` and must
- /// return states with the same description as `state_fields`
- fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
-
- /// the fields that encapsulate the Accumulator's state
- /// the number of fields here equals the number of states that the accumulator contains
- fn state_fields(&self) -> Result<Vec<Field>>;
-
- /// expressions that are passed to the Accumulator.
- /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
- /// Human readable name such as `"MIN(c2)"`. The default
- /// implementation returns placeholder text.
- fn name(&self) -> &str {
- "AggregateExpr: default name"
- }
-}
-
-/// A window expression that:
-/// * knows its resulting field
-pub trait WindowExpr: Send + Sync + Debug {
- /// Returns the window expression as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-
- /// the field of the final result of this window function.
- fn field(&self) -> Result<Field>;
-
- /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
- /// implementation returns placeholder text.
- fn name(&self) -> &str {
- "WindowExpr: default name"
- }
-
- /// expressions that are passed to the WindowAccumulator.
- /// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`],
- /// others (e.g. `cov`) return many.
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
- /// evaluate the window function arguments against the batch and return
- /// array ref, normally the resulting vec is a single element one.
- fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
- self.expressions()
- .iter()
- .map(|e| e.evaluate(batch))
- .map(|r| r.map(|v| v.into_array(batch.num_rows())))
- .collect()
- }
-
- /// evaluate the window function values against the batch
- fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
-
- /// evaluate the partition points given the sort columns; if the sort columns are
- /// empty then the result will be a single element vec of the whole column rows.
- fn evaluate_partition_points(
- &self,
- num_rows: usize,
- partition_columns: &[SortColumn],
- ) -> Result<Vec<Range<usize>>> {
- if partition_columns.is_empty() {
- Ok(vec![Range {
- start: 0,
- end: num_rows,
- }])
- } else {
- Ok(lexicographical_partition_ranges(partition_columns)
- .map_err(DataFusionError::ArrowError)?
- .collect::<Vec<_>>())
- }
- }
-
- /// expressions that's from the window function's partition by clause, empty if absent
- fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
-
- /// expressions that's from the window function's order by clause, empty if absent
- fn order_by(&self) -> &[PhysicalSortExpr];
-
- /// get partition columns that can be used for partitioning, empty if absent
- fn partition_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
- self.partition_by()
- .iter()
- .map(|expr| {
- PhysicalSortExpr {
- expr: expr.clone(),
- options: SortOptions::default(),
- }
- .evaluate_to_sort_column(batch)
- })
- .collect()
- }
-
- /// get sort columns that can be used for peer evaluation, empty if absent
- fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
- let mut sort_columns = self.partition_columns(batch)?;
- let order_by_columns = self
- .order_by()
- .iter()
- .map(|e| e.evaluate_to_sort_column(batch))
- .collect::<Result<Vec<SortColumn>>>()?;
- sort_columns.extend(order_by_columns);
- Ok(sort_columns)
- }
-}
+pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr};
/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema