You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/19 09:08:21 UTC

[arrow-datafusion] 01/01: move built in window expr and partition evaluator

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-window-partition-eval-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 9e24e33dc572d6538ab929d9db9d07a8e4c18c69
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sat Feb 19 17:06:15 2022 +0800

    move built in window expr and partition evaluator
---
 datafusion-physical-expr/src/lib.rs                |  3 +-
 .../src/window/built_in_window_function_expr.rs    | 53 +++++++++++++
 .../src/{lib.rs => window/mod.rs}                  | 12 +--
 .../src/window/partition_evaluator.rs              | 84 +++++++++++++++++++++
 .../src/{ => window}/window_expr.rs                |  0
 .../src/physical_plan/expressions/cume_dist.rs     |  5 +-
 .../src/physical_plan/expressions/lead_lag.rs      |  5 +-
 .../src/physical_plan/expressions/nth_value.rs     |  5 +-
 datafusion/src/physical_plan/expressions/rank.rs   |  5 +-
 .../src/physical_plan/expressions/row_number.rs    |  5 +-
 datafusion/src/physical_plan/mod.rs                |  3 +-
 datafusion/src/physical_plan/window_functions.rs   | 88 +---------------------
 datafusion/src/physical_plan/windows/aggregate.rs  |  2 +-
 datafusion/src/physical_plan/windows/built_in.rs   |  6 +-
 datafusion/src/physical_plan/windows/mod.rs        | 24 +-----
 15 files changed, 165 insertions(+), 135 deletions(-)

diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/lib.rs
index 63edaa5..6beffd1 100644
--- a/datafusion-physical-expr/src/lib.rs
+++ b/datafusion-physical-expr/src/lib.rs
@@ -18,9 +18,8 @@
 mod aggregate_expr;
 mod physical_expr;
 mod sort_expr;
-mod window_expr;
+pub mod window;
 
 pub use aggregate_expr::AggregateExpr;
 pub use physical_expr::PhysicalExpr;
 pub use sort_expr::PhysicalSortExpr;
-pub use window_expr::WindowExpr;
diff --git a/datafusion-physical-expr/src/window/built_in_window_function_expr.rs b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs
new file mode 100644
index 0000000..43e1272
--- /dev/null
+++ b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::partition_evaluator::PartitionEvaluator;
+use crate::PhysicalExpr;
+use arrow::datatypes::Field;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use std::any::Any;
+use std::sync::Arc;
+
+/// A window expression that is a built-in window function.
+///
+/// Note that unlike aggregation based window functions, built-in window functions normally ignore
+/// window frame spec, with the exception of first_value, last_value, and nth_value.
+pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
+    /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field>;
+
+    /// expressions that are passed to the Accumulator.
+    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+    /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
+    /// implementation returns placeholder text.
+    fn name(&self) -> &str {
+        "BuiltInWindowFunctionExpr: default name"
+    }
+
+    /// Create built-in window evaluator with a batch
+    fn create_evaluator(
+        &self,
+        batch: &RecordBatch,
+    ) -> Result<Box<dyn PartitionEvaluator>>;
+}
diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/window/mod.rs
similarity index 78%
copy from datafusion-physical-expr/src/lib.rs
copy to datafusion-physical-expr/src/window/mod.rs
index 63edaa5..64f8b33 100644
--- a/datafusion-physical-expr/src/lib.rs
+++ b/datafusion-physical-expr/src/window/mod.rs
@@ -15,12 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod aggregate_expr;
-mod physical_expr;
-mod sort_expr;
-mod window_expr;
-
-pub use aggregate_expr::AggregateExpr;
-pub use physical_expr::PhysicalExpr;
-pub use sort_expr::PhysicalSortExpr;
-pub use window_expr::WindowExpr;
+pub mod built_in_window_function_expr;
+pub mod partition_evaluator;
+pub mod window_expr;
diff --git a/datafusion-physical-expr/src/window/partition_evaluator.rs b/datafusion-physical-expr/src/window/partition_evaluator.rs
new file mode 100644
index 0000000..9afdf38
--- /dev/null
+++ b/datafusion-physical-expr/src/window/partition_evaluator.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use datafusion_common::DataFusionError;
+use datafusion_common::Result;
+use std::ops::Range;
+
+/// Given a partition range, and the full list of sort partition points, given that the sort
+/// partition points are sorted using [partition columns..., order columns...], the split
+/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted
+/// on finer columns), so this will use binary search to find ranges that are within the
+/// partition range and return the valid slice.
+pub fn find_ranges_in_range<'a>(
+    partition_range: &Range<usize>,
+    sort_partition_points: &'a [Range<usize>],
+) -> &'a [Range<usize>] {
+    let start_idx = sort_partition_points
+        .partition_point(|sort_range| sort_range.start < partition_range.start);
+    let end_idx = start_idx
+        + sort_partition_points[start_idx..]
+            .partition_point(|sort_range| sort_range.end <= partition_range.end);
+    &sort_partition_points[start_idx..end_idx]
+}
+
+/// Partition evaluator
+pub trait PartitionEvaluator {
+    /// Whether the evaluator should be evaluated with rank
+    fn include_rank(&self) -> bool {
+        false
+    }
+
+    /// evaluate the partition evaluator against the partitions
+    fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
+        partition_points
+            .into_iter()
+            .map(|partition| self.evaluate_partition(partition))
+            .collect()
+    }
+
+    /// evaluate the partition evaluator against the partitions with rank information
+    fn evaluate_with_rank(
+        &self,
+        partition_points: Vec<Range<usize>>,
+        sort_partition_points: Vec<Range<usize>>,
+    ) -> Result<Vec<ArrayRef>> {
+        partition_points
+            .into_iter()
+            .map(|partition| {
+                let ranks_in_partition =
+                    find_ranges_in_range(&partition, &sort_partition_points);
+                self.evaluate_partition_with_rank(partition, ranks_in_partition)
+            })
+            .collect()
+    }
+
+    /// evaluate the partition evaluator against the partition
+    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
+
+    /// evaluate the partition evaluator against the partition but with rank
+    fn evaluate_partition_with_rank(
+        &self,
+        _partition: Range<usize>,
+        _ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        Err(DataFusionError::NotImplemented(
+            "evaluate_partition_with_rank is not implemented by default".into(),
+        ))
+    }
+}
diff --git a/datafusion-physical-expr/src/window_expr.rs b/datafusion-physical-expr/src/window/window_expr.rs
similarity index 100%
rename from datafusion-physical-expr/src/window_expr.rs
rename to datafusion-physical-expr/src/window/window_expr.rs
diff --git a/datafusion/src/physical_plan/expressions/cume_dist.rs b/datafusion/src/physical_plan/expressions/cume_dist.rs
index 7b0a45a..09c34c8 100644
--- a/datafusion/src/physical_plan/expressions/cume_dist.rs
+++ b/datafusion/src/physical_plan/expressions/cume_dist.rs
@@ -19,12 +19,13 @@
 //! at runtime during query execution
 
 use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::array::Float64Array;
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
 use std::any::Any;
 use std::iter;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/lead_lag.rs b/datafusion/src/physical_plan/expressions/lead_lag.rs
index 52f80c3..c083da6 100644
--- a/datafusion/src/physical_plan/expressions/lead_lag.rs
+++ b/datafusion/src/physical_plan/expressions/lead_lag.rs
@@ -19,13 +19,14 @@
 //! at runtime during query execution
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use crate::scalar::ScalarValue;
 use arrow::array::ArrayRef;
 use arrow::compute::cast;
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
 use std::any::Any;
 use std::ops::Neg;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs
index 64a526a..1d7e98c 100644
--- a/datafusion/src/physical_plan/expressions/nth_value.rs
+++ b/datafusion/src/physical_plan/expressions/nth_value.rs
@@ -19,13 +19,14 @@
 //! that can evaluated at runtime during query execution
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use crate::scalar::ScalarValue;
 use arrow::array::{new_null_array, ArrayRef};
 use arrow::compute::kernels::window::shift;
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
 use std::any::Any;
 use std::iter;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/rank.rs b/datafusion/src/physical_plan/expressions/rank.rs
index 4facb8b..af47e10 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/rank.rs
@@ -19,12 +19,13 @@
 //! at runtime during query execution
 
 use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::array::{Float64Array, UInt64Array};
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
 use std::any::Any;
 use std::iter;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs
index be87a35..e415493 100644
--- a/datafusion/src/physical_plan/expressions/row_number.rs
+++ b/datafusion/src/physical_plan/expressions/row_number.rs
@@ -18,11 +18,12 @@
 //! Defines physical expression for `row_number` that can evaluated at runtime during query execution
 
 use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use arrow::array::{ArrayRef, UInt64Array};
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
 use std::any::Any;
 use std::ops::Range;
 use std::sync::Arc;
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 9949982..e049304 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -469,7 +469,8 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
 }
 
-pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr};
+pub use datafusion_physical_expr::window::window_expr::WindowExpr;
+pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
 
 /// Applies an optional projection to a [`SchemaRef`], returning the
 /// projected schema
diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs
index b5445ae..b667bfe 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -20,20 +20,11 @@
 //!
 //! see also <https://www.postgresql.org/docs/current/functions-window.html>
 
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
 use crate::physical_plan::functions::{TypeSignature, Volatility};
-use crate::physical_plan::{
-    aggregates, functions::Signature, type_coercion::data_types,
-    windows::find_ranges_in_range, PhysicalExpr,
-};
-use arrow::array::ArrayRef;
+use crate::physical_plan::{aggregates, functions::Signature, type_coercion::data_types};
 use arrow::datatypes::DataType;
-use arrow::datatypes::Field;
-use arrow::record_batch::RecordBatch;
 pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction};
-use std::any::Any;
-use std::ops::Range;
-use std::sync::Arc;
 
 /// Returns the datatype of the window function
 pub fn return_type(
@@ -112,81 +103,6 @@ pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
     }
 }
 
-/// Partition evaluator
-pub(crate) trait PartitionEvaluator {
-    /// Whether the evaluator should be evaluated with rank
-    fn include_rank(&self) -> bool {
-        false
-    }
-
-    /// evaluate the partition evaluator against the partitions
-    fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
-        partition_points
-            .into_iter()
-            .map(|partition| self.evaluate_partition(partition))
-            .collect()
-    }
-
-    /// evaluate the partition evaluator against the partitions with rank information
-    fn evaluate_with_rank(
-        &self,
-        partition_points: Vec<Range<usize>>,
-        sort_partition_points: Vec<Range<usize>>,
-    ) -> Result<Vec<ArrayRef>> {
-        partition_points
-            .into_iter()
-            .map(|partition| {
-                let ranks_in_partition =
-                    find_ranges_in_range(&partition, &sort_partition_points);
-                self.evaluate_partition_with_rank(partition, ranks_in_partition)
-            })
-            .collect()
-    }
-
-    /// evaluate the partition evaluator against the partition
-    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
-
-    /// evaluate the partition evaluator against the partition but with rank
-    fn evaluate_partition_with_rank(
-        &self,
-        _partition: Range<usize>,
-        _ranks_in_partition: &[Range<usize>],
-    ) -> Result<ArrayRef> {
-        Err(DataFusionError::NotImplemented(
-            "evaluate_partition_with_rank is not implemented by default".into(),
-        ))
-    }
-}
-
-/// A window expression that is a built-in window function.
-///
-/// Note that unlike aggregation based window functions, built-in window functions normally ignore
-/// window frame spec, with the exception of first_value, last_value, and nth_value.
-pub(crate) trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
-    /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// the field of the final result of this aggregation.
-    fn field(&self) -> Result<Field>;
-
-    /// expressions that are passed to the Accumulator.
-    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
-    /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
-    /// implementation returns placeholder text.
-    fn name(&self) -> &str {
-        "BuiltInWindowFunctionExpr: default name"
-    }
-
-    /// Create built-in window evaluator with a batch
-    fn create_evaluator(
-        &self,
-        batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>>;
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/src/physical_plan/windows/aggregate.rs b/datafusion/src/physical_plan/windows/aggregate.rs
index 4c97e2b..5a6f1fe 100644
--- a/datafusion/src/physical_plan/windows/aggregate.rs
+++ b/datafusion/src/physical_plan/windows/aggregate.rs
@@ -18,7 +18,6 @@
 //! Physical exec for aggregate window function expressions.
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::windows::find_ranges_in_range;
 use crate::physical_plan::{
     expressions::PhysicalSortExpr, Accumulator, AggregateExpr, PhysicalExpr, WindowExpr,
 };
@@ -26,6 +25,7 @@ use arrow::compute::concat;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
 use datafusion_expr::{WindowFrame, WindowFrameUnits};
+use datafusion_physical_expr::window::partition_evaluator::find_ranges_in_range;
 use std::any::Any;
 use std::iter::IntoIterator;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/windows/built_in.rs b/datafusion/src/physical_plan/windows/built_in.rs
index de627cb..4f1dfba 100644
--- a/datafusion/src/physical_plan/windows/built_in.rs
+++ b/datafusion/src/physical_plan/windows/built_in.rs
@@ -18,13 +18,11 @@
 //! Physical exec for built-in window function expressions.
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{
-    expressions::PhysicalSortExpr, window_functions::BuiltInWindowFunctionExpr,
-    PhysicalExpr, WindowExpr,
-};
+use crate::physical_plan::{expressions::PhysicalSortExpr, PhysicalExpr, WindowExpr};
 use arrow::compute::concat;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
 use std::any::Any;
 use std::sync::Arc;
 
diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs
index b3bf9ce..9deab56 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -25,17 +25,14 @@ use crate::physical_plan::{
         PhysicalSortExpr, RowNumber,
     },
     type_coercion::coerce,
-    window_functions::{
-        signature_for_built_in, BuiltInWindowFunction, BuiltInWindowFunctionExpr,
-        WindowFunction,
-    },
+    window_functions::{signature_for_built_in, BuiltInWindowFunction, WindowFunction},
     PhysicalExpr, WindowExpr,
 };
 use crate::scalar::ScalarValue;
 use arrow::datatypes::Schema;
 use datafusion_expr::WindowFrame;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
 use std::convert::TryInto;
-use std::ops::Range;
 use std::sync::Arc;
 
 mod aggregate;
@@ -153,23 +150,6 @@ fn create_built_in_window_expr(
     })
 }
 
-/// Given a partition range, and the full list of sort partition points, given that the sort
-/// partition points are sorted using [partition columns..., order columns...], the split
-/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted
-/// on finer columns), so this will use binary search to find ranges that are within the
-/// partition range and return the valid slice.
-pub(crate) fn find_ranges_in_range<'a>(
-    partition_range: &Range<usize>,
-    sort_partition_points: &'a [Range<usize>],
-) -> &'a [Range<usize>] {
-    let start_idx = sort_partition_points
-        .partition_point(|sort_range| sort_range.start < partition_range.start);
-    let end_idx = start_idx
-        + sort_partition_points[start_idx..]
-            .partition_point(|sort_range| sort_range.end <= partition_range.end);
-    &sort_partition_points[start_idx..end_idx]
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;