You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by yj...@apache.org on 2022/04/25 23:53:58 UTC

[arrow-datafusion] branch master updated: [physical-expr] move aggregate exprs and window exprs to their own modules (#2335)

This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 510724288 [physical-expr] move aggregate exprs and window exprs to their own modules (#2335)
510724288 is described below

commit 5107242881e1d091f7defd04c6bf993c216ebf87
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Tue Apr 26 07:53:54 2022 +0800

    [physical-expr] move aggregate exprs and window exprs to their own modules (#2335)
    
    * [physical-expr] move aggregate exprs and window exprs to their own modules
    
    * fix
---
 .../core/src/physical_plan/aggregate_rule.rs       | 20 ------
 .../src/physical_plan/aggregates}/mod.rs           |  8 +--
 datafusion/core/src/physical_plan/mod.rs           |  1 -
 .../{expressions => aggregate}/approx_distinct.rs  |  5 +-
 .../{expressions => aggregate}/approx_median.rs    |  0
 .../approx_percentile_cont.rs                      | 10 ++-
 .../approx_percentile_cont_with_weight.rs          |  8 +--
 .../src/{expressions => aggregate}/array_agg.rs    |  2 +-
 .../src/{expressions => aggregate}/average.rs      |  4 +-
 .../src/aggregate/build_in.rs}                     | 11 ++--
 .../coercion_rule.rs}                              |  2 +-
 .../src/{expressions => aggregate}/correlation.rs  | 11 ++--
 .../src/{expressions => aggregate}/count.rs        |  2 +-
 .../src/{expressions => aggregate}/covariance.rs   |  3 +-
 .../distinct_expressions.rs                        |  0
 .../mod.rs => aggregate/hyperloglog.rs}            |  0
 .../src/{expressions => aggregate}/min_max.rs      |  2 +-
 .../src/{aggregate_expr.rs => aggregate/mod.rs}    | 21 +++++++
 .../src/{expressions => aggregate}/stats.rs        |  0
 .../src/{expressions => aggregate}/stddev.rs       |  7 ++-
 .../src/{expressions => aggregate}/sum.rs          |  6 +-
 .../src/{tdigest/mod.rs => aggregate/tdigest.rs}   |  0
 .../src/{expressions => aggregate}/variance.rs     |  4 +-
 datafusion/physical-expr/src/expressions/mod.rs    | 71 ++++++++--------------
 datafusion/physical-expr/src/lib.rs                |  7 +--
 .../src/{expressions => window}/cume_dist.rs       |  0
 .../src/{expressions => window}/lead_lag.rs        |  0
 datafusion/physical-expr/src/window/mod.rs         |  5 ++
 .../src/{expressions => window}/nth_value.rs       |  0
 .../src/{expressions => window}/rank.rs            |  0
 .../src/{expressions => window}/row_number.rs      |  0
 31 files changed, 94 insertions(+), 116 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregate_rule.rs b/datafusion/core/src/physical_plan/aggregate_rule.rs
deleted file mode 100644
index 71775eba9..000000000
--- a/datafusion/core/src/physical_plan/aggregate_rule.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Support the coercion rule for aggregate function.
-
-pub use datafusion_physical_expr::coercion_rule::aggregate_rule::coerce_exprs;
diff --git a/datafusion/physical-expr/src/coercion_rule/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs
similarity index 84%
rename from datafusion/physical-expr/src/coercion_rule/mod.rs
rename to datafusion/core/src/physical_plan/aggregates/mod.rs
index 0b2df7a84..c0208b239 100644
--- a/datafusion/physical-expr/src/coercion_rule/mod.rs
+++ b/datafusion/core/src/physical_plan/aggregates/mod.rs
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Define coercion rules for different Expr type.
-//!
-//! Aggregate function rule
-//! Binary operation rule
+//! Aggregates functionalities
 
-pub mod aggregate_rule;
+pub use datafusion_expr::AggregateFunction;
+pub use datafusion_physical_expr::expressions::create_aggregate_expr;
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index 984859168..b7b25a636 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -552,7 +552,6 @@ pub mod explain;
 use crate::execution::context::TaskContext;
 pub use datafusion_physical_expr::expressions;
 
-pub mod aggregate_rule;
 pub mod file_format;
 pub mod filter;
 pub mod functions;
diff --git a/datafusion/physical-expr/src/expressions/approx_distinct.rs b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
similarity index 98%
rename from datafusion/physical-expr/src/expressions/approx_distinct.rs
rename to datafusion/physical-expr/src/aggregate/approx_distinct.rs
index 610f381bb..391258447 100644
--- a/datafusion/physical-expr/src/expressions/approx_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
@@ -17,8 +17,9 @@
 
 //! Defines physical expressions that can evaluated at runtime during query execution
 
-use super::format_state_name;
-use crate::{hyperloglog::HyperLogLog, AggregateExpr, PhysicalExpr};
+use super::hyperloglog::HyperLogLog;
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
 use arrow::array::{
     ArrayRef, BinaryArray, BinaryOffsetSizeTrait, GenericBinaryArray, GenericStringArray,
     PrimitiveArray, StringOffsetSizeTrait,
diff --git a/datafusion/physical-expr/src/expressions/approx_median.rs b/datafusion/physical-expr/src/aggregate/approx_median.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/approx_median.rs
rename to datafusion/physical-expr/src/aggregate/approx_median.rs
diff --git a/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
similarity index 98%
rename from datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
rename to datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index 65d478cc7..5e8ab1712 100644
--- a/datafusion/physical-expr/src/expressions/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -15,12 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::{format_state_name, Literal};
-use crate::tdigest::TryIntoOrderedF64;
-use crate::{
-    tdigest::{TDigest, DEFAULT_MAX_SIZE},
-    AggregateExpr, PhysicalExpr,
-};
+use crate::aggregate::tdigest::TryIntoOrderedF64;
+use crate::aggregate::tdigest::{TDigest, DEFAULT_MAX_SIZE};
+use crate::expressions::{format_state_name, Literal};
+use crate::{AggregateExpr, PhysicalExpr};
 use arrow::{
     array::{
         ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
diff --git a/datafusion/physical-expr/src/expressions/approx_percentile_cont_with_weight.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
similarity index 96%
rename from datafusion/physical-expr/src/expressions/approx_percentile_cont_with_weight.rs
rename to datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
index 1beb7a86c..f9874b0a5 100644
--- a/datafusion/physical-expr/src/expressions/approx_percentile_cont_with_weight.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
@@ -15,12 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::expressions::approx_percentile_cont::ApproxPercentileAccumulator;
+use crate::aggregate::approx_percentile_cont::ApproxPercentileAccumulator;
+use crate::aggregate::tdigest::{Centroid, TDigest, DEFAULT_MAX_SIZE};
 use crate::expressions::ApproxPercentileCont;
-use crate::{
-    tdigest::{Centroid, TDigest, DEFAULT_MAX_SIZE},
-    AggregateExpr, PhysicalExpr,
-};
+use crate::{AggregateExpr, PhysicalExpr};
 use arrow::{
     array::ArrayRef,
     datatypes::{DataType, Field},
diff --git a/datafusion/physical-expr/src/expressions/array_agg.rs b/datafusion/physical-expr/src/aggregate/array_agg.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/array_agg.rs
rename to datafusion/physical-expr/src/aggregate/array_agg.rs
index e187930f3..4f2bc3ece 100644
--- a/datafusion/physical-expr/src/expressions/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -17,7 +17,7 @@
 
 //! Defines physical expressions that can evaluated at runtime during query execution
 
-use super::format_state_name;
+use crate::expressions::format_state_name;
 use crate::{AggregateExpr, PhysicalExpr};
 use arrow::array::ArrayRef;
 use arrow::datatypes::{DataType, Field};
diff --git a/datafusion/physical-expr/src/expressions/average.rs b/datafusion/physical-expr/src/aggregate/average.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/average.rs
rename to datafusion/physical-expr/src/aggregate/average.rs
index 5c26e6c9d..637a7f99d 100644
--- a/datafusion/physical-expr/src/expressions/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -21,6 +21,8 @@ use std::any::Any;
 use std::convert::TryFrom;
 use std::sync::Arc;
 
+use crate::aggregate::sum;
+use crate::expressions::format_state_name;
 use crate::{AggregateExpr, PhysicalExpr};
 use arrow::compute;
 use arrow::datatypes::DataType;
@@ -32,8 +34,6 @@ use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
-use super::{format_state_name, sum};
-
 /// AVG aggregate expression
 #[derive(Debug)]
 pub struct Avg {
diff --git a/datafusion/core/src/physical_plan/aggregates.rs b/datafusion/physical-expr/src/aggregate/build_in.rs
similarity index 99%
rename from datafusion/core/src/physical_plan/aggregates.rs
rename to datafusion/physical-expr/src/aggregate/build_in.rs
index f04c81c2a..f91e01336 100644
--- a/datafusion/core/src/physical_plan/aggregates.rs
+++ b/datafusion/physical-expr/src/aggregate/build_in.rs
@@ -26,11 +26,10 @@
 //! * Signature: see `Signature`
 //! * Return type: a function `(arg_types) -> return_type`. E.g. for min, ([f32]) -> f32, ([f64]) -> f64.
 
-use super::aggregate_rule::coerce_exprs;
-use super::{AggregateExpr, PhysicalExpr};
-use crate::error::{DataFusionError, Result};
-use crate::physical_plan::expressions;
+use crate::aggregate::coercion_rule::coerce_exprs;
+use crate::{expressions, AggregateExpr, PhysicalExpr};
 use arrow::datatypes::Schema;
+use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::aggregate_function;
 use datafusion_expr::aggregate_function::return_type;
 pub use datafusion_expr::AggregateFunction;
@@ -257,13 +256,13 @@ pub fn create_aggregate_expr(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::physical_plan::expressions::{
+    use crate::expressions::{
         ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg, Correlation,
         Count, Covariance, DistinctArrayAgg, DistinctCount, Max, Min, Stddev, Sum,
         Variance,
     };
-    use crate::{error::Result, scalar::ScalarValue};
     use arrow::datatypes::{DataType, Field};
+    use datafusion_common::ScalarValue;
     use datafusion_expr::aggregate_function::NUMERICS;
 
     #[test]
diff --git a/datafusion/physical-expr/src/coercion_rule/aggregate_rule.rs b/datafusion/physical-expr/src/aggregate/coercion_rule.rs
similarity index 97%
rename from datafusion/physical-expr/src/coercion_rule/aggregate_rule.rs
rename to datafusion/physical-expr/src/aggregate/coercion_rule.rs
index 7cf9c0301..7b1e26ed1 100644
--- a/datafusion/physical-expr/src/coercion_rule/aggregate_rule.rs
+++ b/datafusion/physical-expr/src/aggregate/coercion_rule.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Support the coercion rule for aggregate function.
+//! Define coercion rules for Aggregate function.
 
 use crate::expressions::try_cast;
 use crate::PhysicalExpr;
diff --git a/datafusion/physical-expr/src/expressions/correlation.rs b/datafusion/physical-expr/src/aggregate/correlation.rs
similarity index 98%
rename from datafusion/physical-expr/src/expressions/correlation.rs
rename to datafusion/physical-expr/src/aggregate/correlation.rs
index 516dca305..0a01cf7df 100644
--- a/datafusion/physical-expr/src/expressions/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -17,10 +17,11 @@
 
 //! Defines physical expressions that can evaluated at runtime during query execution
 
-use crate::{
-    expressions::{covariance::CovarianceAccumulator, stddev::StddevAccumulator},
-    AggregateExpr, PhysicalExpr,
-};
+use crate::aggregate::covariance::CovarianceAccumulator;
+use crate::aggregate::stats::StatsType;
+use crate::aggregate::stddev::StddevAccumulator;
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
 use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
 use datafusion_common::Result;
 use datafusion_common::ScalarValue;
@@ -28,8 +29,6 @@ use datafusion_expr::Accumulator;
 use std::any::Any;
 use std::sync::Arc;
 
-use super::{format_state_name, StatsType};
-
 /// CORR aggregate expression
 #[derive(Debug)]
 pub struct Correlation {
diff --git a/datafusion/physical-expr/src/expressions/count.rs b/datafusion/physical-expr/src/aggregate/count.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/count.rs
rename to datafusion/physical-expr/src/aggregate/count.rs
index ccc5a8ebd..9e8485e92 100644
--- a/datafusion/physical-expr/src/expressions/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -31,7 +31,7 @@ use datafusion_common::Result;
 use datafusion_common::ScalarValue;
 use datafusion_expr::Accumulator;
 
-use super::format_state_name;
+use crate::expressions::format_state_name;
 
 /// COUNT aggregate expression
 /// Returns the amount of non-null values of the given expression.
diff --git a/datafusion/physical-expr/src/expressions/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/covariance.rs
rename to datafusion/physical-expr/src/aggregate/covariance.rs
index dd0895cba..80d403b65 100644
--- a/datafusion/physical-expr/src/expressions/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -32,7 +32,8 @@ use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
-use super::{format_state_name, StatsType};
+use crate::aggregate::stats::StatsType;
+use crate::expressions::format_state_name;
 
 /// COVAR and COVAR_SAMP aggregate expression
 #[derive(Debug)]
diff --git a/datafusion/physical-expr/src/expressions/distinct_expressions.rs b/datafusion/physical-expr/src/aggregate/distinct_expressions.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/distinct_expressions.rs
rename to datafusion/physical-expr/src/aggregate/distinct_expressions.rs
diff --git a/datafusion/physical-expr/src/hyperloglog/mod.rs b/datafusion/physical-expr/src/aggregate/hyperloglog.rs
similarity index 100%
rename from datafusion/physical-expr/src/hyperloglog/mod.rs
rename to datafusion/physical-expr/src/aggregate/hyperloglog.rs
diff --git a/datafusion/physical-expr/src/expressions/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/min_max.rs
rename to datafusion/physical-expr/src/aggregate/min_max.rs
index a599d65c4..7de10e4b8 100644
--- a/datafusion/physical-expr/src/expressions/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -37,7 +37,7 @@ use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
-use super::format_state_name;
+use crate::expressions::format_state_name;
 use arrow::array::Array;
 use arrow::array::DecimalArray;
 
diff --git a/datafusion/physical-expr/src/aggregate_expr.rs b/datafusion/physical-expr/src/aggregate/mod.rs
similarity index 82%
rename from datafusion/physical-expr/src/aggregate_expr.rs
rename to datafusion/physical-expr/src/aggregate/mod.rs
index a22472a49..019a60cd5 100644
--- a/datafusion/physical-expr/src/aggregate_expr.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -23,6 +23,27 @@ use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
 
+pub(crate) mod approx_distinct;
+pub(crate) mod approx_median;
+pub(crate) mod approx_percentile_cont;
+pub(crate) mod approx_percentile_cont_with_weight;
+pub(crate) mod array_agg;
+pub(crate) mod average;
+pub(crate) mod coercion_rule;
+pub(crate) mod correlation;
+pub(crate) mod count;
+pub(crate) mod covariance;
+pub(crate) mod distinct_expressions;
+#[macro_use]
+pub(crate) mod min_max;
+pub mod build_in;
+mod hyperloglog;
+pub(crate) mod stats;
+pub(crate) mod stddev;
+pub(crate) mod sum;
+mod tdigest;
+pub(crate) mod variance;
+
 /// An aggregate expression that:
 /// * knows its resulting field
 /// * knows how to create its accumulator
diff --git a/datafusion/physical-expr/src/expressions/stats.rs b/datafusion/physical-expr/src/aggregate/stats.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/stats.rs
rename to datafusion/physical-expr/src/aggregate/stats.rs
diff --git a/datafusion/physical-expr/src/expressions/stddev.rs b/datafusion/physical-expr/src/aggregate/stddev.rs
similarity index 98%
rename from datafusion/physical-expr/src/expressions/stddev.rs
rename to datafusion/physical-expr/src/aggregate/stddev.rs
index 425d6f36b..560ea6f6b 100644
--- a/datafusion/physical-expr/src/expressions/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -20,14 +20,15 @@
 use std::any::Any;
 use std::sync::Arc;
 
-use crate::{expressions::variance::VarianceAccumulator, AggregateExpr, PhysicalExpr};
+use crate::aggregate::stats::StatsType;
+use crate::aggregate::variance::VarianceAccumulator;
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
 use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};
 use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
-use super::{format_state_name, StatsType};
-
 /// STDDEV and STDDEV_SAMP (standard deviation) aggregate expression
 #[derive(Debug)]
 pub struct Stddev {
diff --git a/datafusion/physical-expr/src/expressions/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/sum.rs
rename to datafusion/physical-expr/src/aggregate/sum.rs
index 84594fe7a..12572f9a9 100644
--- a/datafusion/physical-expr/src/expressions/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -34,7 +34,7 @@ use arrow::{
 use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::Accumulator;
 
-use super::format_state_name;
+use crate::expressions::format_state_name;
 use arrow::array::Array;
 use arrow::array::DecimalArray;
 
@@ -144,7 +144,7 @@ fn sum_decimal_batch(
 }
 
 // sums the array and returns a ScalarValue of its corresponding type.
-pub(super) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
+pub(crate) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
     Ok(match values.data_type() {
         DataType::Decimal(precision, scale) => {
             sum_decimal_batch(values, precision, scale)?
@@ -221,7 +221,7 @@ fn sum_decimal_with_diff_scale(
     }
 }
 
-pub(super) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+pub(crate) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
     Ok(match (lhs, rhs) {
         (ScalarValue::Decimal128(v1, p1, s1), ScalarValue::Decimal128(v2, p2, s2)) => {
             let max_precision = p1.max(p2);
diff --git a/datafusion/physical-expr/src/tdigest/mod.rs b/datafusion/physical-expr/src/aggregate/tdigest.rs
similarity index 100%
rename from datafusion/physical-expr/src/tdigest/mod.rs
rename to datafusion/physical-expr/src/aggregate/tdigest.rs
diff --git a/datafusion/physical-expr/src/expressions/variance.rs b/datafusion/physical-expr/src/aggregate/variance.rs
similarity index 99%
rename from datafusion/physical-expr/src/expressions/variance.rs
rename to datafusion/physical-expr/src/aggregate/variance.rs
index 5a9dc080d..306f1512f 100644
--- a/datafusion/physical-expr/src/expressions/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -20,6 +20,8 @@
 use std::any::Any;
 use std::sync::Arc;
 
+use crate::aggregate::stats::StatsType;
+use crate::expressions::format_state_name;
 use crate::{AggregateExpr, PhysicalExpr};
 use arrow::array::Float64Array;
 use arrow::{
@@ -32,8 +34,6 @@ use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
-use super::{format_state_name, StatsType};
-
 /// VAR and VAR_SAMP aggregate expression
 #[derive(Debug)]
 pub struct Variance {
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index a230e790f..813e87c8f 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -17,84 +17,65 @@
 
 //! Defines physical expressions that can evaluated at runtime during query execution
 
-mod approx_distinct;
-mod approx_percentile_cont;
-mod approx_percentile_cont_with_weight;
-mod array_agg;
-mod average;
 #[macro_use]
 mod binary;
 mod case;
 mod cast;
 mod column;
-mod count;
-mod cume_dist;
 mod get_indexed_field;
 mod in_list;
 mod is_not_null;
 mod is_null;
-mod lead_lag;
 mod literal;
-#[macro_use]
-mod min_max;
-mod approx_median;
-mod correlation;
-mod covariance;
-mod distinct_expressions;
 mod negative;
 mod not;
-mod nth_value;
 mod nullif;
-mod rank;
-mod row_number;
-mod stats;
-mod stddev;
-mod sum;
 mod try_cast;
-mod variance;
 
 /// Module with some convenient methods used in expression building
 pub mod helpers {
-    pub use super::min_max::{max, min};
+    pub use crate::aggregate::min_max::{max, min};
 }
 
-pub use approx_distinct::ApproxDistinct;
-pub use approx_median::ApproxMedian;
-pub use approx_percentile_cont::ApproxPercentileCont;
-pub use approx_percentile_cont_with_weight::ApproxPercentileContWithWeight;
-pub use array_agg::ArrayAgg;
-pub use average::{Avg, AvgAccumulator};
+pub use crate::aggregate::approx_distinct::ApproxDistinct;
+pub use crate::aggregate::approx_median::ApproxMedian;
+pub use crate::aggregate::approx_percentile_cont::ApproxPercentileCont;
+pub use crate::aggregate::approx_percentile_cont_with_weight::ApproxPercentileContWithWeight;
+pub use crate::aggregate::array_agg::ArrayAgg;
+pub use crate::aggregate::average::{Avg, AvgAccumulator};
+pub use crate::aggregate::build_in::create_aggregate_expr;
+pub use crate::aggregate::correlation::Correlation;
+pub use crate::aggregate::count::Count;
+pub use crate::aggregate::covariance::{Covariance, CovariancePop};
+pub use crate::aggregate::distinct_expressions::{DistinctArrayAgg, DistinctCount};
+pub use crate::aggregate::min_max::{Max, Min};
+pub use crate::aggregate::min_max::{MaxAccumulator, MinAccumulator};
+pub use crate::aggregate::stats::StatsType;
+pub use crate::aggregate::stddev::{Stddev, StddevPop};
+pub use crate::aggregate::sum::Sum;
+pub use crate::aggregate::variance::{Variance, VariancePop};
+
+pub use crate::window::cume_dist::cume_dist;
+pub use crate::window::lead_lag::{lag, lead};
+pub use crate::window::nth_value::NthValue;
+pub use crate::window::rank::{dense_rank, percent_rank, rank};
+pub use crate::window::row_number::RowNumber;
+
 pub use binary::{binary, BinaryExpr};
 pub use case::{case, CaseExpr};
 pub use cast::{
     cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
 };
 pub use column::{col, Column};
-pub use correlation::Correlation;
-pub use count::Count;
-pub use covariance::{Covariance, CovariancePop};
-pub use cume_dist::cume_dist;
-
-pub use distinct_expressions::{DistinctArrayAgg, DistinctCount};
 pub use get_indexed_field::GetIndexedFieldExpr;
 pub use in_list::{in_list, InListExpr};
 pub use is_not_null::{is_not_null, IsNotNullExpr};
 pub use is_null::{is_null, IsNullExpr};
-pub use lead_lag::{lag, lead};
 pub use literal::{lit, Literal};
-pub use min_max::{Max, Min};
-pub use min_max::{MaxAccumulator, MinAccumulator};
 pub use negative::{negative, NegativeExpr};
 pub use not::{not, NotExpr};
-pub use nth_value::NthValue;
 pub use nullif::nullif_func;
-pub use rank::{dense_rank, percent_rank, rank};
-pub use row_number::RowNumber;
-pub use stats::StatsType;
-pub use stddev::{Stddev, StddevPop};
-pub use sum::Sum;
 pub use try_cast::{try_cast, TryCastExpr};
-pub use variance::{Variance, VariancePop};
 
 /// returns the name of the state
 pub fn format_state_name(name: &str, state_name: &str) -> String {
@@ -103,7 +84,7 @@ pub fn format_state_name(name: &str, state_name: &str) -> String {
 pub use crate::PhysicalSortExpr;
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
     use crate::AggregateExpr;
     use arrow::record_batch::RecordBatch;
     use datafusion_common::Result;
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 94d003614..38b0da3b8 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -15,28 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod aggregate_expr;
+pub mod aggregate;
 pub mod array_expressions;
-pub mod coercion_rule;
 pub mod conditional_expressions;
 #[cfg(feature = "crypto_expressions")]
 pub mod crypto_expressions;
 pub mod datetime_expressions;
 pub mod expressions;
 mod functions;
-mod hyperloglog;
 pub mod math_expressions;
 mod physical_expr;
 #[cfg(feature = "regex_expressions")]
 pub mod regex_expressions;
 mod sort_expr;
 pub mod string_expressions;
-mod tdigest;
 #[cfg(feature = "unicode_expressions")]
 pub mod unicode_expressions;
 pub mod window;
 
-pub use aggregate_expr::AggregateExpr;
+pub use aggregate::AggregateExpr;
 pub use functions::ScalarFunctionExpr;
 pub use physical_expr::PhysicalExpr;
 pub use sort_expr::PhysicalSortExpr;
diff --git a/datafusion/physical-expr/src/expressions/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/cume_dist.rs
rename to datafusion/physical-expr/src/window/cume_dist.rs
diff --git a/datafusion/physical-expr/src/expressions/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/lead_lag.rs
rename to datafusion/physical-expr/src/window/lead_lag.rs
diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs
index 044cd1491..059f8b886 100644
--- a/datafusion/physical-expr/src/window/mod.rs
+++ b/datafusion/physical-expr/src/window/mod.rs
@@ -18,7 +18,12 @@
 mod aggregate;
 mod built_in;
 mod built_in_window_function_expr;
+pub(crate) mod cume_dist;
+pub(crate) mod lead_lag;
+pub(crate) mod nth_value;
 pub(crate) mod partition_evaluator;
+pub(crate) mod rank;
+pub(crate) mod row_number;
 mod window_expr;
 
 pub use aggregate::AggregateWindowExpr;
diff --git a/datafusion/physical-expr/src/expressions/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/nth_value.rs
rename to datafusion/physical-expr/src/window/nth_value.rs
diff --git a/datafusion/physical-expr/src/expressions/rank.rs b/datafusion/physical-expr/src/window/rank.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/rank.rs
rename to datafusion/physical-expr/src/window/rank.rs
diff --git a/datafusion/physical-expr/src/expressions/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs
similarity index 100%
rename from datafusion/physical-expr/src/expressions/row_number.rs
rename to datafusion/physical-expr/src/window/row_number.rs