You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/19 09:10:11 UTC

[arrow-datafusion] branch move-window-partition-eval-expr updated (9e24e33 -> 27fbfcf)

This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a change to branch move-window-partition-eval-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git.


 discard 9e24e33  move built in window expr and partition evaluator
     new 27fbfcf  move built in window expr and partition evaluator

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (9e24e33)
            \
             N -- N -- N   refs/heads/move-window-partition-eval-expr (27fbfcf)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 datafusion-physical-expr/src/window/mod.rs             | 11 ++++++++---
 datafusion/src/physical_plan/expressions/cume_dist.rs  |  4 ++--
 datafusion/src/physical_plan/expressions/lead_lag.rs   |  4 ++--
 datafusion/src/physical_plan/expressions/nth_value.rs  |  4 ++--
 datafusion/src/physical_plan/expressions/rank.rs       |  4 ++--
 datafusion/src/physical_plan/expressions/row_number.rs |  4 ++--
 datafusion/src/physical_plan/mod.rs                    |  2 +-
 datafusion/src/physical_plan/windows/aggregate.rs      |  2 +-
 datafusion/src/physical_plan/windows/built_in.rs       |  2 +-
 datafusion/src/physical_plan/windows/mod.rs            |  2 +-
 10 files changed, 22 insertions(+), 17 deletions(-)

[arrow-datafusion] 01/01: move built in window expr and partition evaluator

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiayuliu pushed a commit to branch move-window-partition-eval-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit 27fbfcf82e542f311896fd18087dbcb1a664bbc6
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sat Feb 19 17:06:15 2022 +0800

    move built in window expr and partition evaluator
---
 datafusion-physical-expr/src/lib.rs                |  3 +-
 .../src/window/built_in_window_function_expr.rs    | 53 +++++++++++++
 .../src/{lib.rs => window/mod.rs}                  | 11 ++-
 .../src/window/partition_evaluator.rs              | 84 +++++++++++++++++++++
 .../src/{ => window}/window_expr.rs                |  0
 .../src/physical_plan/expressions/cume_dist.rs     |  5 +-
 .../src/physical_plan/expressions/lead_lag.rs      |  5 +-
 .../src/physical_plan/expressions/nth_value.rs     |  5 +-
 datafusion/src/physical_plan/expressions/rank.rs   |  5 +-
 .../src/physical_plan/expressions/row_number.rs    |  5 +-
 datafusion/src/physical_plan/mod.rs                |  3 +-
 datafusion/src/physical_plan/window_functions.rs   | 88 +---------------------
 datafusion/src/physical_plan/windows/aggregate.rs  |  2 +-
 datafusion/src/physical_plan/windows/built_in.rs   |  6 +-
 datafusion/src/physical_plan/windows/mod.rs        | 24 +-----
 15 files changed, 167 insertions(+), 132 deletions(-)

diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/lib.rs
index 63edaa5..6beffd1 100644
--- a/datafusion-physical-expr/src/lib.rs
+++ b/datafusion-physical-expr/src/lib.rs
@@ -18,9 +18,8 @@
 mod aggregate_expr;
 mod physical_expr;
 mod sort_expr;
-mod window_expr;
+pub mod window;
 
 pub use aggregate_expr::AggregateExpr;
 pub use physical_expr::PhysicalExpr;
 pub use sort_expr::PhysicalSortExpr;
-pub use window_expr::WindowExpr;
diff --git a/datafusion-physical-expr/src/window/built_in_window_function_expr.rs b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs
new file mode 100644
index 0000000..43e1272
--- /dev/null
+++ b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::partition_evaluator::PartitionEvaluator;
+use crate::PhysicalExpr;
+use arrow::datatypes::Field;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use std::any::Any;
+use std::sync::Arc;
+
+/// A window expression that is a built-in window function.
+///
+/// Note that unlike aggregation based window functions, built-in window functions normally ignore
+/// window frame spec, with the exception of first_value, last_value, and nth_value.
+pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
+    /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
+    /// downcast to a specific implementation.
+    fn as_any(&self) -> &dyn Any;
+
+    /// the field of the final result of this aggregation.
+    fn field(&self) -> Result<Field>;
+
+    /// expressions that are passed to the Accumulator.
+    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+    /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
+    /// implementation returns placeholder text.
+    fn name(&self) -> &str {
+        "BuiltInWindowFunctionExpr: default name"
+    }
+
+    /// Create built-in window evaluator with a batch
+    fn create_evaluator(
+        &self,
+        batch: &RecordBatch,
+    ) -> Result<Box<dyn PartitionEvaluator>>;
+}
diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/window/mod.rs
similarity index 79%
copy from datafusion-physical-expr/src/lib.rs
copy to datafusion-physical-expr/src/window/mod.rs
index 63edaa5..48a6e8b 100644
--- a/datafusion-physical-expr/src/lib.rs
+++ b/datafusion-physical-expr/src/window/mod.rs
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod aggregate_expr;
-mod physical_expr;
-mod sort_expr;
+mod built_in_window_function_expr;
+mod partition_evaluator;
 mod window_expr;
 
-pub use aggregate_expr::AggregateExpr;
-pub use physical_expr::PhysicalExpr;
-pub use sort_expr::PhysicalSortExpr;
+pub use built_in_window_function_expr::BuiltInWindowFunctionExpr;
+pub use partition_evaluator::find_ranges_in_range;
+pub use partition_evaluator::PartitionEvaluator;
 pub use window_expr::WindowExpr;
diff --git a/datafusion-physical-expr/src/window/partition_evaluator.rs b/datafusion-physical-expr/src/window/partition_evaluator.rs
new file mode 100644
index 0000000..9afdf38
--- /dev/null
+++ b/datafusion-physical-expr/src/window/partition_evaluator.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use datafusion_common::DataFusionError;
+use datafusion_common::Result;
+use std::ops::Range;
+
+/// Given a partition range, and the full list of sort partition points, given that the sort
+/// partition points are sorted using [partition columns..., order columns...], the split
+/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted
+/// on finer columns), so this will use binary search to find ranges that are within the
+/// partition range and return the valid slice.
+pub fn find_ranges_in_range<'a>(
+    partition_range: &Range<usize>,
+    sort_partition_points: &'a [Range<usize>],
+) -> &'a [Range<usize>] {
+    let start_idx = sort_partition_points
+        .partition_point(|sort_range| sort_range.start < partition_range.start);
+    let end_idx = start_idx
+        + sort_partition_points[start_idx..]
+            .partition_point(|sort_range| sort_range.end <= partition_range.end);
+    &sort_partition_points[start_idx..end_idx]
+}
+
+/// Partition evaluator
+pub trait PartitionEvaluator {
+    /// Whether the evaluator should be evaluated with rank
+    fn include_rank(&self) -> bool {
+        false
+    }
+
+    /// evaluate the partition evaluator against the partitions
+    fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
+        partition_points
+            .into_iter()
+            .map(|partition| self.evaluate_partition(partition))
+            .collect()
+    }
+
+    /// evaluate the partition evaluator against the partitions with rank information
+    fn evaluate_with_rank(
+        &self,
+        partition_points: Vec<Range<usize>>,
+        sort_partition_points: Vec<Range<usize>>,
+    ) -> Result<Vec<ArrayRef>> {
+        partition_points
+            .into_iter()
+            .map(|partition| {
+                let ranks_in_partition =
+                    find_ranges_in_range(&partition, &sort_partition_points);
+                self.evaluate_partition_with_rank(partition, ranks_in_partition)
+            })
+            .collect()
+    }
+
+    /// evaluate the partition evaluator against the partition
+    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
+
+    /// evaluate the partition evaluator against the partition but with rank
+    fn evaluate_partition_with_rank(
+        &self,
+        _partition: Range<usize>,
+        _ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        Err(DataFusionError::NotImplemented(
+            "evaluate_partition_with_rank is not implemented by default".into(),
+        ))
+    }
+}
diff --git a/datafusion-physical-expr/src/window_expr.rs b/datafusion-physical-expr/src/window/window_expr.rs
similarity index 100%
rename from datafusion-physical-expr/src/window_expr.rs
rename to datafusion-physical-expr/src/window/window_expr.rs
diff --git a/datafusion/src/physical_plan/expressions/cume_dist.rs b/datafusion/src/physical_plan/expressions/cume_dist.rs
index 7b0a45a..6f24d82 100644
--- a/datafusion/src/physical_plan/expressions/cume_dist.rs
+++ b/datafusion/src/physical_plan/expressions/cume_dist.rs
@@ -19,12 +19,13 @@
 //! at runtime during query execution
 
 use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::array::Float64Array;
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::PartitionEvaluator;
 use std::any::Any;
 use std::iter;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/lead_lag.rs b/datafusion/src/physical_plan/expressions/lead_lag.rs
index 52f80c3..fef5bad 100644
--- a/datafusion/src/physical_plan/expressions/lead_lag.rs
+++ b/datafusion/src/physical_plan/expressions/lead_lag.rs
@@ -19,13 +19,14 @@
 //! at runtime during query execution
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use crate::scalar::ScalarValue;
 use arrow::array::ArrayRef;
 use arrow::compute::cast;
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::PartitionEvaluator;
 use std::any::Any;
 use std::ops::Neg;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs
index 64a526a..895a8b9 100644
--- a/datafusion/src/physical_plan/expressions/nth_value.rs
+++ b/datafusion/src/physical_plan/expressions/nth_value.rs
@@ -19,13 +19,14 @@
 //! that can evaluated at runtime during query execution
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use crate::scalar::ScalarValue;
 use arrow::array::{new_null_array, ArrayRef};
 use arrow::compute::kernels::window::shift;
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::PartitionEvaluator;
 use std::any::Any;
 use std::iter;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/rank.rs b/datafusion/src/physical_plan/expressions/rank.rs
index 4facb8b..66697a0 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/rank.rs
@@ -19,12 +19,13 @@
 //! at runtime during query execution
 
 use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use arrow::array::ArrayRef;
 use arrow::array::{Float64Array, UInt64Array};
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::PartitionEvaluator;
 use std::any::Any;
 use std::iter;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs
index be87a35..4c349c1 100644
--- a/datafusion/src/physical_plan/expressions/row_number.rs
+++ b/datafusion/src/physical_plan/expressions/row_number.rs
@@ -18,11 +18,12 @@
 //! Defines physical expression for `row_number` that can evaluated at runtime during query execution
 
 use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
 use arrow::array::{ArrayRef, UInt64Array};
 use arrow::datatypes::{DataType, Field};
 use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::PartitionEvaluator;
 use std::any::Any;
 use std::ops::Range;
 use std::sync::Arc;
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 9949982..32f2d5b 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -469,7 +469,8 @@ pub enum Distribution {
     HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
 }
 
-pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr};
+pub use datafusion_physical_expr::window::WindowExpr;
+pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
 
 /// Applies an optional projection to a [`SchemaRef`], returning the
 /// projected schema
diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs
index b5445ae..b667bfe 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -20,20 +20,11 @@
 //!
 //! see also <https://www.postgresql.org/docs/current/functions-window.html>
 
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
 use crate::physical_plan::functions::{TypeSignature, Volatility};
-use crate::physical_plan::{
-    aggregates, functions::Signature, type_coercion::data_types,
-    windows::find_ranges_in_range, PhysicalExpr,
-};
-use arrow::array::ArrayRef;
+use crate::physical_plan::{aggregates, functions::Signature, type_coercion::data_types};
 use arrow::datatypes::DataType;
-use arrow::datatypes::Field;
-use arrow::record_batch::RecordBatch;
 pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction};
-use std::any::Any;
-use std::ops::Range;
-use std::sync::Arc;
 
 /// Returns the datatype of the window function
 pub fn return_type(
@@ -112,81 +103,6 @@ pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
     }
 }
 
-/// Partition evaluator
-pub(crate) trait PartitionEvaluator {
-    /// Whether the evaluator should be evaluated with rank
-    fn include_rank(&self) -> bool {
-        false
-    }
-
-    /// evaluate the partition evaluator against the partitions
-    fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
-        partition_points
-            .into_iter()
-            .map(|partition| self.evaluate_partition(partition))
-            .collect()
-    }
-
-    /// evaluate the partition evaluator against the partitions with rank information
-    fn evaluate_with_rank(
-        &self,
-        partition_points: Vec<Range<usize>>,
-        sort_partition_points: Vec<Range<usize>>,
-    ) -> Result<Vec<ArrayRef>> {
-        partition_points
-            .into_iter()
-            .map(|partition| {
-                let ranks_in_partition =
-                    find_ranges_in_range(&partition, &sort_partition_points);
-                self.evaluate_partition_with_rank(partition, ranks_in_partition)
-            })
-            .collect()
-    }
-
-    /// evaluate the partition evaluator against the partition
-    fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
-
-    /// evaluate the partition evaluator against the partition but with rank
-    fn evaluate_partition_with_rank(
-        &self,
-        _partition: Range<usize>,
-        _ranks_in_partition: &[Range<usize>],
-    ) -> Result<ArrayRef> {
-        Err(DataFusionError::NotImplemented(
-            "evaluate_partition_with_rank is not implemented by default".into(),
-        ))
-    }
-}
-
-/// A window expression that is a built-in window function.
-///
-/// Note that unlike aggregation based window functions, built-in window functions normally ignore
-/// window frame spec, with the exception of first_value, last_value, and nth_value.
-pub(crate) trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
-    /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
-    /// downcast to a specific implementation.
-    fn as_any(&self) -> &dyn Any;
-
-    /// the field of the final result of this aggregation.
-    fn field(&self) -> Result<Field>;
-
-    /// expressions that are passed to the Accumulator.
-    /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
-    /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
-    /// implementation returns placeholder text.
-    fn name(&self) -> &str {
-        "BuiltInWindowFunctionExpr: default name"
-    }
-
-    /// Create built-in window evaluator with a batch
-    fn create_evaluator(
-        &self,
-        batch: &RecordBatch,
-    ) -> Result<Box<dyn PartitionEvaluator>>;
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/datafusion/src/physical_plan/windows/aggregate.rs b/datafusion/src/physical_plan/windows/aggregate.rs
index 4c97e2b..30e0b29 100644
--- a/datafusion/src/physical_plan/windows/aggregate.rs
+++ b/datafusion/src/physical_plan/windows/aggregate.rs
@@ -18,7 +18,6 @@
 //! Physical exec for aggregate window function expressions.
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::windows::find_ranges_in_range;
 use crate::physical_plan::{
     expressions::PhysicalSortExpr, Accumulator, AggregateExpr, PhysicalExpr, WindowExpr,
 };
@@ -26,6 +25,7 @@ use arrow::compute::concat;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
 use datafusion_expr::{WindowFrame, WindowFrameUnits};
+use datafusion_physical_expr::window::find_ranges_in_range;
 use std::any::Any;
 use std::iter::IntoIterator;
 use std::ops::Range;
diff --git a/datafusion/src/physical_plan/windows/built_in.rs b/datafusion/src/physical_plan/windows/built_in.rs
index de627cb..3ded850 100644
--- a/datafusion/src/physical_plan/windows/built_in.rs
+++ b/datafusion/src/physical_plan/windows/built_in.rs
@@ -18,13 +18,11 @@
 //! Physical exec for built-in window function expressions.
 
 use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{
-    expressions::PhysicalSortExpr, window_functions::BuiltInWindowFunctionExpr,
-    PhysicalExpr, WindowExpr,
-};
+use crate::physical_plan::{expressions::PhysicalSortExpr, PhysicalExpr, WindowExpr};
 use arrow::compute::concat;
 use arrow::record_batch::RecordBatch;
 use arrow::{array::ArrayRef, datatypes::Field};
+use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
 use std::any::Any;
 use std::sync::Arc;
 
diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs
index b3bf9ce..34f9337 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -25,17 +25,14 @@ use crate::physical_plan::{
         PhysicalSortExpr, RowNumber,
     },
     type_coercion::coerce,
-    window_functions::{
-        signature_for_built_in, BuiltInWindowFunction, BuiltInWindowFunctionExpr,
-        WindowFunction,
-    },
+    window_functions::{signature_for_built_in, BuiltInWindowFunction, WindowFunction},
     PhysicalExpr, WindowExpr,
 };
 use crate::scalar::ScalarValue;
 use arrow::datatypes::Schema;
 use datafusion_expr::WindowFrame;
+use datafusion_physical_expr::window::BuiltInWindowFunctionExpr;
 use std::convert::TryInto;
-use std::ops::Range;
 use std::sync::Arc;
 
 mod aggregate;
@@ -153,23 +150,6 @@ fn create_built_in_window_expr(
     })
 }
 
-/// Given a partition range, and the full list of sort partition points, given that the sort
-/// partition points are sorted using [partition columns..., order columns...], the split
-/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted
-/// on finer columns), so this will use binary search to find ranges that are within the
-/// partition range and return the valid slice.
-pub(crate) fn find_ranges_in_range<'a>(
-    partition_range: &Range<usize>,
-    sort_partition_points: &'a [Range<usize>],
-) -> &'a [Range<usize>] {
-    let start_idx = sort_partition_points
-        .partition_point(|sort_range| sort_range.start < partition_range.start);
-    let end_idx = start_idx
-        + sort_partition_points[start_idx..]
-            .partition_point(|sort_range| sort_range.end <= partition_range.end);
-    &sort_partition_points[start_idx..end_idx]
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;