You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/16 15:35:23 UTC

[arrow-datafusion] 01/01: physical sort crate

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch physical-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit fda9b3a9fbe559369847dc1e9c1565b03442818b
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Wed Feb 16 23:29:44 2022 +0800

    physical sort crate
---
 Cargo.toml                                        |   1 +
 Cargo.toml => datafusion-physical-expr/Cargo.toml |  39 +++---
 datafusion-physical-expr/README.md                |  24 ++++
 datafusion-physical-expr/src/aggregate_expr.rs    |  61 +++++++++
 datafusion-physical-expr/src/lib.rs               |  26 ++++
 datafusion-physical-expr/src/physical_expr.rs     |  42 +++++++
 datafusion-physical-expr/src/sort_expr.rs         |  65 ++++++++++
 datafusion-physical-expr/src/window_expr.rs       | 117 +++++++++++++++++
 datafusion/Cargo.toml                             |   1 +
 datafusion/src/physical_plan/expressions/mod.rs   |  53 +-------
 datafusion/src/physical_plan/mod.rs               | 147 ++--------------------
 11 files changed, 371 insertions(+), 205 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index f74f53c..beaa22d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,6 +20,7 @@ members = [
     "datafusion",
     "datafusion-common",
     "datafusion-expr",
+    "datafusion-physical-expr",
     "datafusion-cli",
     "datafusion-examples",
     "benchmarks",
diff --git a/Cargo.toml b/datafusion-physical-expr/Cargo.toml
similarity index 51%
copy from Cargo.toml
copy to datafusion-physical-expr/Cargo.toml
index f74f53c..ba12b58 100644
--- a/Cargo.toml
+++ b/datafusion-physical-expr/Cargo.toml
@@ -15,21 +15,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-members = [
-    "datafusion",
-    "datafusion-common",
-    "datafusion-expr",
-    "datafusion-cli",
-    "datafusion-examples",
-    "benchmarks",
-    "ballista/rust/client",
-    "ballista/rust/core",
-    "ballista/rust/executor",
-    "ballista/rust/scheduler",
-    "ballista-examples",
-]
+[package]
+name = "datafusion-physical-expr"
+description = "DataFusion is an in-memory query engine that uses Apache Arrow as the memory model"
+version = "7.0.0"
+homepage = "https://github.com/apache/arrow-datafusion"
+repository = "https://github.com/apache/arrow-datafusion"
+readme = "../README.md"
+authors = ["Apache Arrow <de...@arrow.apache.org>"]
+license = "Apache-2.0"
+keywords = [ "arrow", "query", "sql" ]
+edition = "2021"
+rust-version = "1.58"
 
-[profile.release]
-lto = true
-codegen-units = 1
+[lib]
+name = "datafusion_physical_expr"
+path = "src/lib.rs"
+
+[features]
+
+[dependencies]
+datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
+datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
+arrow = { version = "9.0.0", features = ["prettyprint"] }
diff --git a/datafusion-physical-expr/README.md b/datafusion-physical-expr/README.md
new file mode 100644
index 0000000..9c92023
--- /dev/null
+++ b/datafusion-physical-expr/README.md
@@ -0,0 +1,24 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# DataFusion Physical Expr
+
+This is an internal module for fundamental physical expression types of [DataFusion][df].
+
+[df]: https://crates.io/crates/datafusion
diff --git a/datafusion-physical-expr/src/aggregate_expr.rs b/datafusion-physical-expr/src/aggregate_expr.rs
new file mode 100644
index 0000000..428c09b
--- /dev/null
+++ b/datafusion-physical-expr/src/aggregate_expr.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalExpr;
+
+
+
+use arrow::{datatypes::Field};
+use datafusion_common::Result;
+use datafusion_expr::Accumulator;
+use std::fmt::Debug;
+
+use std::sync::Arc;
+use std::{any::Any};
+
+/// An aggregate expression that:
+/// * knows its resulting field
+/// * knows how to create its accumulator
+/// * knows its accumulator's state's field
+/// * knows the expressions from whose its accumulator will receive values
+pub trait AggregateExpr: Send + Sync + Debug {
+  /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
+  /// downcast to a specific implementation.
+  fn as_any(&self) -> &dyn Any;
+
+  /// the field of the final result of this aggregation.
+  fn field(&self) -> Result<Field>;
+
+  /// the accumulator used to accumulate values from the expressions.
+  /// the accumulator expects the same number of arguments as `expressions` and must
+  /// return states with the same description as `state_fields`
+  fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
+
+  /// the fields that encapsulate the Accumulator's state
+  /// the number of fields here equals the number of states that the accumulator contains
+  fn state_fields(&self) -> Result<Vec<Field>>;
+
+  /// expressions that are passed to the Accumulator.
+  /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
+  fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+  /// Human readable name such as `"MIN(c2)"`. The default
+  /// implementation returns placeholder text.
+  fn name(&self) -> &str {
+    "AggregateExpr: default name"
+  }
+}
diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/lib.rs
new file mode 100644
index 0000000..63edaa5
--- /dev/null
+++ b/datafusion-physical-expr/src/lib.rs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod aggregate_expr;
+mod physical_expr;
+mod sort_expr;
+mod window_expr;
+
+pub use aggregate_expr::AggregateExpr;
+pub use physical_expr::PhysicalExpr;
+pub use sort_expr::PhysicalSortExpr;
+pub use window_expr::WindowExpr;
diff --git a/datafusion-physical-expr/src/physical_expr.rs b/datafusion-physical-expr/src/physical_expr.rs
new file mode 100644
index 0000000..875c253
--- /dev/null
+++ b/datafusion-physical-expr/src/physical_expr.rs
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+use arrow::datatypes::{DataType, Schema};
+
+use arrow::record_batch::RecordBatch;
+
+use datafusion_common::Result;
+
+use datafusion_expr::ColumnarValue;
+use std::fmt::{Debug, Display};
+
+use std::{any::Any};
+
+/// Expression that can be evaluated against a RecordBatch
+/// A Physical expression knows its type, nullability and how to evaluate itself.
+pub trait PhysicalExpr: Send + Sync + Display + Debug {
+  /// Returns the physical expression as [`Any`](std::any::Any) so that it can be
+  /// downcast to a specific implementation.
+  fn as_any(&self) -> &dyn Any;
+  /// Get the data type of this expression, given the schema of the input
+  fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
+  /// Determine whether this expression is nullable, given the schema of the input
+  fn nullable(&self, input_schema: &Schema) -> Result<bool>;
+  /// Evaluate an expression against a RecordBatch
+  fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
+}
diff --git a/datafusion-physical-expr/src/sort_expr.rs b/datafusion-physical-expr/src/sort_expr.rs
new file mode 100644
index 0000000..58f77de
--- /dev/null
+++ b/datafusion-physical-expr/src/sort_expr.rs
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalExpr;
+use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::ColumnarValue;
+use std::sync::Arc;
+
+/// Represents Sort operation for a column in a RecordBatch
+#[derive(Clone, Debug)]
+pub struct PhysicalSortExpr {
+  /// Physical expression representing the column to sort
+  pub expr: Arc<dyn PhysicalExpr>,
+  /// Option to specify how the given column should be sorted
+  pub options: SortOptions,
+}
+
+impl std::fmt::Display for PhysicalSortExpr {
+  fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+    let opts_string = match (self.options.descending, self.options.nulls_first) {
+      (true, true) => "DESC",
+      (true, false) => "DESC NULLS LAST",
+      (false, true) => "ASC",
+      (false, false) => "ASC NULLS LAST",
+    };
+
+    write!(f, "{} {}", self.expr, opts_string)
+  }
+}
+
+impl PhysicalSortExpr {
+  /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel
+  pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
+    let value_to_sort = self.expr.evaluate(batch)?;
+    let array_to_sort = match value_to_sort {
+      ColumnarValue::Array(array) => array,
+      ColumnarValue::Scalar(scalar) => {
+        return Err(DataFusionError::Plan(format!(
+          "Sort operation is not applicable to scalar value {}",
+          scalar
+        )));
+      }
+    };
+    Ok(SortColumn {
+      values: array_to_sort,
+      options: Some(self.options),
+    })
+  }
+}
diff --git a/datafusion-physical-expr/src/window_expr.rs b/datafusion-physical-expr/src/window_expr.rs
new file mode 100644
index 0000000..4ded986
--- /dev/null
+++ b/datafusion-physical-expr/src/window_expr.rs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{PhysicalExpr, PhysicalSortExpr};
+use arrow::compute::kernels::partition::lexicographical_partition_ranges;
+use arrow::compute::kernels::sort::{SortColumn, SortOptions};
+use arrow::record_batch::RecordBatch;
+use arrow::{array::ArrayRef, datatypes::Field};
+use datafusion_common::{DataFusionError, Result};
+use std::any::Any;
+use std::fmt::Debug;
+use std::ops::Range;
+use std::sync::Arc;
+
+/// A window expression that:
+/// * knows its resulting field
+pub trait WindowExpr: Send + Sync + Debug {
+  /// Returns the window expression as [`Any`](std::any::Any) so that it can be
+  /// downcast to a specific implementation.
+  fn as_any(&self) -> &dyn Any;
+
+  /// the field of the final result of this window function.
+  fn field(&self) -> Result<Field>;
+
+  /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
+  /// implementation returns placeholder text.
+  fn name(&self) -> &str {
+    "WindowExpr: default name"
+  }
+
+  /// expressions that are passed to the WindowAccumulator.
+  /// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`],
+  /// others (e.g. `cov`) return many.
+  fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+  /// evaluate the window function arguments against the batch and return
+  /// array ref, normally the resulting vec is a single element one.
+  fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
+    self
+      .expressions()
+      .iter()
+      .map(|e| e.evaluate(batch))
+      .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+      .collect()
+  }
+
+  /// evaluate the window function values against the batch
+  fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
+
+  /// evaluate the partition points given the sort columns; if the sort columns are
+  /// empty then the result will be a single element vec of the whole column rows.
+  fn evaluate_partition_points(
+    &self,
+    num_rows: usize,
+    partition_columns: &[SortColumn],
+  ) -> Result<Vec<Range<usize>>> {
+    if partition_columns.is_empty() {
+      Ok(vec![Range {
+        start: 0,
+        end: num_rows,
+      }])
+    } else {
+      Ok(
+        lexicographical_partition_ranges(partition_columns)
+          .map_err(DataFusionError::ArrowError)?
+          .collect::<Vec<_>>(),
+      )
+    }
+  }
+
+  /// expressions that's from the window function's partition by clause, empty if absent
+  fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
+
+  /// expressions that's from the window function's order by clause, empty if absent
+  fn order_by(&self) -> &[PhysicalSortExpr];
+
+  /// get partition columns that can be used for partitioning, empty if absent
+  fn partition_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
+    self
+      .partition_by()
+      .iter()
+      .map(|expr| {
+        PhysicalSortExpr {
+          expr: expr.clone(),
+          options: SortOptions::default(),
+        }
+        .evaluate_to_sort_column(batch)
+      })
+      .collect()
+  }
+
+  /// get sort columns that can be used for peer evaluation, empty if absent
+  fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
+    let mut sort_columns = self.partition_columns(batch)?;
+    let order_by_columns = self
+      .order_by()
+      .iter()
+      .map(|e| e.evaluate_to_sort_column(batch))
+      .collect::<Result<Vec<SortColumn>>>()?;
+    sort_columns.extend(order_by_columns);
+    Ok(sort_columns)
+  }
+}
diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 7da50ad..cbba899 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -54,6 +54,7 @@ row = []
 [dependencies]
 datafusion-common = { path = "../datafusion-common", version = "7.0.0" }
 datafusion-expr = { path = "../datafusion-expr", version = "7.0.0" }
+datafusion-physical-expr = { path = "../datafusion-physical-expr", version = "7.0.0" }
 ahash = { version = "0.7", default-features = false }
 hashbrown = { version = "0.12", features = ["raw"] }
 arrow = { version = "9.0.0", features = ["prettyprint"] }
diff --git a/datafusion/src/physical_plan/expressions/mod.rs b/datafusion/src/physical_plan/expressions/mod.rs
index 06afe00..567e87c 100644
--- a/datafusion/src/physical_plan/expressions/mod.rs
+++ b/datafusion/src/physical_plan/expressions/mod.rs
@@ -17,13 +17,7 @@
 
 //! Defines physical expressions that can evaluated at runtime during query execution
 
-use std::sync::Arc;
-
-use super::ColumnarValue;
-use crate::error::{DataFusionError, Result};
-use crate::physical_plan::PhysicalExpr;
-use arrow::compute::kernels::sort::{SortColumn, SortOptions};
-use arrow::record_batch::RecordBatch;
+use datafusion_expr::ColumnarValue;
 
 mod approx_distinct;
 mod approx_percentile_cont;
@@ -118,52 +112,13 @@ pub fn format_state_name(name: &str, state_name: &str) -> String {
     format!("{}[{}]", name, state_name)
 }
 
-/// Represents Sort operation for a column in a RecordBatch
-#[derive(Clone, Debug)]
-pub struct PhysicalSortExpr {
-    /// Physical expression representing the column to sort
-    pub expr: Arc<dyn PhysicalExpr>,
-    /// Option to specify how the given column should be sorted
-    pub options: SortOptions,
-}
-
-impl std::fmt::Display for PhysicalSortExpr {
-    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        let opts_string = match (self.options.descending, self.options.nulls_first) {
-            (true, true) => "DESC",
-            (true, false) => "DESC NULLS LAST",
-            (false, true) => "ASC",
-            (false, false) => "ASC NULLS LAST",
-        };
-
-        write!(f, "{} {}", self.expr, opts_string)
-    }
-}
-
-impl PhysicalSortExpr {
-    /// evaluate the sort expression into SortColumn that can be passed into arrow sort kernel
-    pub fn evaluate_to_sort_column(&self, batch: &RecordBatch) -> Result<SortColumn> {
-        let value_to_sort = self.expr.evaluate(batch)?;
-        let array_to_sort = match value_to_sort {
-            ColumnarValue::Array(array) => array,
-            ColumnarValue::Scalar(scalar) => {
-                return Err(DataFusionError::Plan(format!(
-                    "Sort operation is not applicable to scalar value {}",
-                    scalar
-                )));
-            }
-        };
-        Ok(SortColumn {
-            values: array_to_sort,
-            options: Some(self.options),
-        })
-    }
-}
+pub use datafusion_physical_expr::PhysicalSortExpr;
 
 #[cfg(test)]
 mod tests {
-    use super::*;
     use crate::{error::Result, physical_plan::AggregateExpr, scalar::ScalarValue};
+    use arrow::record_batch::RecordBatch;
+    use std::sync::Arc;
 
     /// macro to perform an aggregation and verify the result.
     #[macro_export]
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index cb9ae64..b2f91e8 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -24,24 +24,24 @@ use self::{
 };
 use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::{
-    error::{DataFusionError, Result},
+    error::{Result},
     execution::runtime_env::RuntimeEnv,
     scalar::ScalarValue,
 };
-use arrow::compute::kernels::partition::lexicographical_partition_ranges;
-use arrow::compute::kernels::sort::{SortColumn, SortOptions};
-use arrow::datatypes::{DataType, Schema, SchemaRef};
+
+
+use arrow::datatypes::{SchemaRef};
 use arrow::error::Result as ArrowResult;
 use arrow::record_batch::RecordBatch;
-use arrow::{array::ArrayRef, datatypes::Field};
+
 use async_trait::async_trait;
 pub use datafusion_expr::Accumulator;
 pub use datafusion_expr::ColumnarValue;
 pub use display::DisplayFormatType;
 use futures::stream::Stream;
 use std::fmt;
-use std::fmt::{Debug, Display};
-use std::ops::Range;
+use std::fmt::{Debug};
+
 use std::sync::Arc;
 use std::task::{Context, Poll};
 use std::{any::Any, pin::Pin};
@@ -474,138 +474,7 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
 }
 
-/// Expression that can be evaluated against a RecordBatch
-/// A Physical expression knows its type, nullability and how to evaluate itself.
-pub trait PhysicalExpr: Send + Sync + Display + Debug {
-    /// Returns the physical expression as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-    /// Get the data type of this expression, given the schema of the input
-    fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
-    /// Determine whether this expression is nullable, given the schema of the input
-    fn nullable(&self, input_schema: &Schema) -> Result<bool>;
-    /// Evaluate an expression against a RecordBatch
-    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
-}
-
-/// An aggregate expression that:
-/// * knows its resulting field
-/// * knows how to create its accumulator
-/// * knows its accumulator's state's field
-/// * knows the expressions from whose its accumulator will receive values
-pub trait AggregateExpr: Send + Sync + Debug {
-    /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// the field of the final result of this aggregation.
-    fn field(&self) -> Result<Field>;
-
-    /// the accumulator used to accumulate values from the expressions.
-    /// the accumulator expects the same number of arguments as `expressions` and must
-    /// return states with the same description as `state_fields`
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
-
-    /// the fields that encapsulate the Accumulator's state
-    /// the number of fields here equals the number of states that the accumulator contains
-    fn state_fields(&self) -> Result<Vec<Field>>;
-
-    /// expressions that are passed to the Accumulator.
-    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
-    /// Human readable name such as `"MIN(c2)"`. The default
-    /// implementation returns placeholder text.
-    fn name(&self) -> &str {
-        "AggregateExpr: default name"
-    }
-}
-
-/// A window expression that:
-/// * knows its resulting field
-pub trait WindowExpr: Send + Sync + Debug {
-    /// Returns the window expression as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// the field of the final result of this window function.
-    fn field(&self) -> Result<Field>;
-
-    /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
-    /// implementation returns placeholder text.
-    fn name(&self) -> &str {
-        "WindowExpr: default name"
-    }
-
-    /// expressions that are passed to the WindowAccumulator.
-    /// Functions which take a single input argument, such as `sum`, return a single [`datafusion_expr::expr::Expr`],
-    /// others (e.g. `cov`) return many.
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
-    /// evaluate the window function arguments against the batch and return
-    /// array ref, normally the resulting vec is a single element one.
-    fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
-        self.expressions()
-            .iter()
-            .map(|e| e.evaluate(batch))
-            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
-            .collect()
-    }
-
-    /// evaluate the window function values against the batch
-    fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef>;
-
-    /// evaluate the partition points given the sort columns; if the sort columns are
-    /// empty then the result will be a single element vec of the whole column rows.
-    fn evaluate_partition_points(
-        &self,
-        num_rows: usize,
-        partition_columns: &[SortColumn],
-    ) -> Result<Vec<Range<usize>>> {
-        if partition_columns.is_empty() {
-            Ok(vec![Range {
-                start: 0,
-                end: num_rows,
-            }])
-        } else {
-            Ok(lexicographical_partition_ranges(partition_columns)
-                .map_err(DataFusionError::ArrowError)?
-                .collect::<Vec<_>>())
-        }
-    }
-
-    /// expressions that's from the window function's partition by clause, empty if absent
-    fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
-
-    /// expressions that's from the window function's order by clause, empty if absent
-    fn order_by(&self) -> &[PhysicalSortExpr];
-
-    /// get partition columns that can be used for partitioning, empty if absent
-    fn partition_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
-        self.partition_by()
-            .iter()
-            .map(|expr| {
-                PhysicalSortExpr {
-                    expr: expr.clone(),
-                    options: SortOptions::default(),
-                }
-                .evaluate_to_sort_column(batch)
-            })
-            .collect()
-    }
-
-    /// get sort columns that can be used for peer evaluation, empty if absent
-    fn sort_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> {
-        let mut sort_columns = self.partition_columns(batch)?;
-        let order_by_columns = self
-            .order_by()
-            .iter()
-            .map(|e| e.evaluate_to_sort_column(batch))
-            .collect::<Result<Vec<SortColumn>>>()?;
-        sort_columns.extend(order_by_columns);
-        Ok(sort_columns)
-    }
-}
+pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr};
 
 /// Applies an optional projection to a [`SchemaRef`], returning the
 /// projected schema