You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ji...@apache.org on 2022/02/19 09:08:21 UTC
[arrow-datafusion] 01/01: move built in window expr and partition evaluator
This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch move-window-partition-eval-expr
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
commit 9e24e33dc572d6538ab929d9db9d07a8e4c18c69
Author: Jiayu Liu <ji...@hey.com>
AuthorDate: Sat Feb 19 17:06:15 2022 +0800
move built in window expr and partition evaluator
---
datafusion-physical-expr/src/lib.rs | 3 +-
.../src/window/built_in_window_function_expr.rs | 53 +++++++++++++
.../src/{lib.rs => window/mod.rs} | 12 +--
.../src/window/partition_evaluator.rs | 84 +++++++++++++++++++++
.../src/{ => window}/window_expr.rs | 0
.../src/physical_plan/expressions/cume_dist.rs | 5 +-
.../src/physical_plan/expressions/lead_lag.rs | 5 +-
.../src/physical_plan/expressions/nth_value.rs | 5 +-
datafusion/src/physical_plan/expressions/rank.rs | 5 +-
.../src/physical_plan/expressions/row_number.rs | 5 +-
datafusion/src/physical_plan/mod.rs | 3 +-
datafusion/src/physical_plan/window_functions.rs | 88 +---------------------
datafusion/src/physical_plan/windows/aggregate.rs | 2 +-
datafusion/src/physical_plan/windows/built_in.rs | 6 +-
datafusion/src/physical_plan/windows/mod.rs | 24 +-----
15 files changed, 165 insertions(+), 135 deletions(-)
diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/lib.rs
index 63edaa5..6beffd1 100644
--- a/datafusion-physical-expr/src/lib.rs
+++ b/datafusion-physical-expr/src/lib.rs
@@ -18,9 +18,8 @@
mod aggregate_expr;
mod physical_expr;
mod sort_expr;
-mod window_expr;
+pub mod window;
pub use aggregate_expr::AggregateExpr;
pub use physical_expr::PhysicalExpr;
pub use sort_expr::PhysicalSortExpr;
-pub use window_expr::WindowExpr;
diff --git a/datafusion-physical-expr/src/window/built_in_window_function_expr.rs b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs
new file mode 100644
index 0000000..43e1272
--- /dev/null
+++ b/datafusion-physical-expr/src/window/built_in_window_function_expr.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::partition_evaluator::PartitionEvaluator;
+use crate::PhysicalExpr;
+use arrow::datatypes::Field;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use std::any::Any;
+use std::sync::Arc;
+
+/// A window expression that is a built-in window function.
+///
+/// Note that unlike aggregation based window functions, built-in window functions normally ignore
+/// window frame spec, with the exception of first_value, last_value, and nth_value.
+pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
+ /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any;
+
+ /// the field of the final result of this aggregation.
+ fn field(&self) -> Result<Field>;
+
+ /// expressions that are passed to the Accumulator.
+ /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
+ fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
+
+ /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
+ /// implementation returns placeholder text.
+ fn name(&self) -> &str {
+ "BuiltInWindowFunctionExpr: default name"
+ }
+
+ /// Create built-in window evaluator with a batch
+ fn create_evaluator(
+ &self,
+ batch: &RecordBatch,
+ ) -> Result<Box<dyn PartitionEvaluator>>;
+}
diff --git a/datafusion-physical-expr/src/lib.rs b/datafusion-physical-expr/src/window/mod.rs
similarity index 78%
copy from datafusion-physical-expr/src/lib.rs
copy to datafusion-physical-expr/src/window/mod.rs
index 63edaa5..64f8b33 100644
--- a/datafusion-physical-expr/src/lib.rs
+++ b/datafusion-physical-expr/src/window/mod.rs
@@ -15,12 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-mod aggregate_expr;
-mod physical_expr;
-mod sort_expr;
-mod window_expr;
-
-pub use aggregate_expr::AggregateExpr;
-pub use physical_expr::PhysicalExpr;
-pub use sort_expr::PhysicalSortExpr;
-pub use window_expr::WindowExpr;
+pub mod built_in_window_function_expr;
+pub mod partition_evaluator;
+pub mod window_expr;
diff --git a/datafusion-physical-expr/src/window/partition_evaluator.rs b/datafusion-physical-expr/src/window/partition_evaluator.rs
new file mode 100644
index 0000000..9afdf38
--- /dev/null
+++ b/datafusion-physical-expr/src/window/partition_evaluator.rs
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::ArrayRef;
+use datafusion_common::DataFusionError;
+use datafusion_common::Result;
+use std::ops::Range;
+
+/// Given a partition range, and the full list of sort partition points, given that the sort
+/// partition points are sorted using [partition columns..., order columns...], the split
+/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted
+/// on finer columns), so this will use binary search to find ranges that are within the
+/// partition range and return the valid slice.
+pub fn find_ranges_in_range<'a>(
+ partition_range: &Range<usize>,
+ sort_partition_points: &'a [Range<usize>],
+) -> &'a [Range<usize>] {
+ let start_idx = sort_partition_points
+ .partition_point(|sort_range| sort_range.start < partition_range.start);
+ let end_idx = start_idx
+ + sort_partition_points[start_idx..]
+ .partition_point(|sort_range| sort_range.end <= partition_range.end);
+ &sort_partition_points[start_idx..end_idx]
+}
+
+/// Partition evaluator
+pub trait PartitionEvaluator {
+ /// Whether the evaluator should be evaluated with rank
+ fn include_rank(&self) -> bool {
+ false
+ }
+
+ /// evaluate the partition evaluator against the partitions
+ fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
+ partition_points
+ .into_iter()
+ .map(|partition| self.evaluate_partition(partition))
+ .collect()
+ }
+
+ /// evaluate the partition evaluator against the partitions with rank information
+ fn evaluate_with_rank(
+ &self,
+ partition_points: Vec<Range<usize>>,
+ sort_partition_points: Vec<Range<usize>>,
+ ) -> Result<Vec<ArrayRef>> {
+ partition_points
+ .into_iter()
+ .map(|partition| {
+ let ranks_in_partition =
+ find_ranges_in_range(&partition, &sort_partition_points);
+ self.evaluate_partition_with_rank(partition, ranks_in_partition)
+ })
+ .collect()
+ }
+
+ /// evaluate the partition evaluator against the partition
+ fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
+
+ /// evaluate the partition evaluator against the partition but with rank
+ fn evaluate_partition_with_rank(
+ &self,
+ _partition: Range<usize>,
+ _ranks_in_partition: &[Range<usize>],
+ ) -> Result<ArrayRef> {
+ Err(DataFusionError::NotImplemented(
+ "evaluate_partition_with_rank is not implemented by default".into(),
+ ))
+ }
+}
diff --git a/datafusion-physical-expr/src/window_expr.rs b/datafusion-physical-expr/src/window/window_expr.rs
similarity index 100%
rename from datafusion-physical-expr/src/window_expr.rs
rename to datafusion-physical-expr/src/window/window_expr.rs
diff --git a/datafusion/src/physical_plan/expressions/cume_dist.rs b/datafusion/src/physical_plan/expressions/cume_dist.rs
index 7b0a45a..09c34c8 100644
--- a/datafusion/src/physical_plan/expressions/cume_dist.rs
+++ b/datafusion/src/physical_plan/expressions/cume_dist.rs
@@ -19,12 +19,13 @@
//! at runtime during query execution
use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
use std::any::Any;
use std::iter;
use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/lead_lag.rs b/datafusion/src/physical_plan/expressions/lead_lag.rs
index 52f80c3..c083da6 100644
--- a/datafusion/src/physical_plan/expressions/lead_lag.rs
+++ b/datafusion/src/physical_plan/expressions/lead_lag.rs
@@ -19,13 +19,14 @@
//! at runtime during query execution
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
use crate::scalar::ScalarValue;
use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
use std::any::Any;
use std::ops::Neg;
use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs
index 64a526a..1d7e98c 100644
--- a/datafusion/src/physical_plan/expressions/nth_value.rs
+++ b/datafusion/src/physical_plan/expressions/nth_value.rs
@@ -19,13 +19,14 @@
//! that can evaluated at runtime during query execution
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
use crate::scalar::ScalarValue;
use arrow::array::{new_null_array, ArrayRef};
use arrow::compute::kernels::window::shift;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
use std::any::Any;
use std::iter;
use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/rank.rs b/datafusion/src/physical_plan/expressions/rank.rs
index 4facb8b..af47e10 100644
--- a/datafusion/src/physical_plan/expressions/rank.rs
+++ b/datafusion/src/physical_plan/expressions/rank.rs
@@ -19,12 +19,13 @@
//! at runtime during query execution
use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::{Float64Array, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
use std::any::Any;
use std::iter;
use std::ops::Range;
diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs
index be87a35..e415493 100644
--- a/datafusion/src/physical_plan/expressions/row_number.rs
+++ b/datafusion/src/physical_plan/expressions/row_number.rs
@@ -18,11 +18,12 @@
//! Defines physical expression for `row_number` that can evaluated at runtime during query execution
use crate::error::Result;
-use crate::physical_plan::window_functions::PartitionEvaluator;
-use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr};
+use crate::physical_plan::PhysicalExpr;
use arrow::array::{ArrayRef, UInt64Array};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
+use datafusion_physical_expr::window::partition_evaluator::PartitionEvaluator;
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;
diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs
index 9949982..e049304 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -469,7 +469,8 @@ pub enum Distribution {
HashPartitioned(Vec<Arc<dyn PhysicalExpr>>),
}
-pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, WindowExpr};
+pub use datafusion_physical_expr::window::window_expr::WindowExpr;
+pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs
index b5445ae..b667bfe 100644
--- a/datafusion/src/physical_plan/window_functions.rs
+++ b/datafusion/src/physical_plan/window_functions.rs
@@ -20,20 +20,11 @@
//!
//! see also <https://www.postgresql.org/docs/current/functions-window.html>
-use crate::error::{DataFusionError, Result};
+use crate::error::Result;
use crate::physical_plan::functions::{TypeSignature, Volatility};
-use crate::physical_plan::{
- aggregates, functions::Signature, type_coercion::data_types,
- windows::find_ranges_in_range, PhysicalExpr,
-};
-use arrow::array::ArrayRef;
+use crate::physical_plan::{aggregates, functions::Signature, type_coercion::data_types};
use arrow::datatypes::DataType;
-use arrow::datatypes::Field;
-use arrow::record_batch::RecordBatch;
pub use datafusion_expr::{BuiltInWindowFunction, WindowFunction};
-use std::any::Any;
-use std::ops::Range;
-use std::sync::Arc;
/// Returns the datatype of the window function
pub fn return_type(
@@ -112,81 +103,6 @@ pub(super) fn signature_for_built_in(fun: &BuiltInWindowFunction) -> Signature {
}
}
-/// Partition evaluator
-pub(crate) trait PartitionEvaluator {
- /// Whether the evaluator should be evaluated with rank
- fn include_rank(&self) -> bool {
- false
- }
-
- /// evaluate the partition evaluator against the partitions
- fn evaluate(&self, partition_points: Vec<Range<usize>>) -> Result<Vec<ArrayRef>> {
- partition_points
- .into_iter()
- .map(|partition| self.evaluate_partition(partition))
- .collect()
- }
-
- /// evaluate the partition evaluator against the partitions with rank information
- fn evaluate_with_rank(
- &self,
- partition_points: Vec<Range<usize>>,
- sort_partition_points: Vec<Range<usize>>,
- ) -> Result<Vec<ArrayRef>> {
- partition_points
- .into_iter()
- .map(|partition| {
- let ranks_in_partition =
- find_ranges_in_range(&partition, &sort_partition_points);
- self.evaluate_partition_with_rank(partition, ranks_in_partition)
- })
- .collect()
- }
-
- /// evaluate the partition evaluator against the partition
- fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef>;
-
- /// evaluate the partition evaluator against the partition but with rank
- fn evaluate_partition_with_rank(
- &self,
- _partition: Range<usize>,
- _ranks_in_partition: &[Range<usize>],
- ) -> Result<ArrayRef> {
- Err(DataFusionError::NotImplemented(
- "evaluate_partition_with_rank is not implemented by default".into(),
- ))
- }
-}
-
-/// A window expression that is a built-in window function.
-///
-/// Note that unlike aggregation based window functions, built-in window functions normally ignore
-/// window frame spec, with the exception of first_value, last_value, and nth_value.
-pub(crate) trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
- /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any;
-
- /// the field of the final result of this aggregation.
- fn field(&self) -> Result<Field>;
-
- /// expressions that are passed to the Accumulator.
- /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
- fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
-
- /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
- /// implementation returns placeholder text.
- fn name(&self) -> &str {
- "BuiltInWindowFunctionExpr: default name"
- }
-
- /// Create built-in window evaluator with a batch
- fn create_evaluator(
- &self,
- batch: &RecordBatch,
- ) -> Result<Box<dyn PartitionEvaluator>>;
-}
-
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/src/physical_plan/windows/aggregate.rs b/datafusion/src/physical_plan/windows/aggregate.rs
index 4c97e2b..5a6f1fe 100644
--- a/datafusion/src/physical_plan/windows/aggregate.rs
+++ b/datafusion/src/physical_plan/windows/aggregate.rs
@@ -18,7 +18,6 @@
//! Physical exec for aggregate window function expressions.
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::windows::find_ranges_in_range;
use crate::physical_plan::{
expressions::PhysicalSortExpr, Accumulator, AggregateExpr, PhysicalExpr, WindowExpr,
};
@@ -26,6 +25,7 @@ use arrow::compute::concat;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use datafusion_expr::{WindowFrame, WindowFrameUnits};
+use datafusion_physical_expr::window::partition_evaluator::find_ranges_in_range;
use std::any::Any;
use std::iter::IntoIterator;
use std::ops::Range;
diff --git a/datafusion/src/physical_plan/windows/built_in.rs b/datafusion/src/physical_plan/windows/built_in.rs
index de627cb..4f1dfba 100644
--- a/datafusion/src/physical_plan/windows/built_in.rs
+++ b/datafusion/src/physical_plan/windows/built_in.rs
@@ -18,13 +18,11 @@
//! Physical exec for built-in window function expressions.
use crate::error::{DataFusionError, Result};
-use crate::physical_plan::{
- expressions::PhysicalSortExpr, window_functions::BuiltInWindowFunctionExpr,
- PhysicalExpr, WindowExpr,
-};
+use crate::physical_plan::{expressions::PhysicalSortExpr, PhysicalExpr, WindowExpr};
use arrow::compute::concat;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
use std::any::Any;
use std::sync::Arc;
diff --git a/datafusion/src/physical_plan/windows/mod.rs b/datafusion/src/physical_plan/windows/mod.rs
index b3bf9ce..9deab56 100644
--- a/datafusion/src/physical_plan/windows/mod.rs
+++ b/datafusion/src/physical_plan/windows/mod.rs
@@ -25,17 +25,14 @@ use crate::physical_plan::{
PhysicalSortExpr, RowNumber,
},
type_coercion::coerce,
- window_functions::{
- signature_for_built_in, BuiltInWindowFunction, BuiltInWindowFunctionExpr,
- WindowFunction,
- },
+ window_functions::{signature_for_built_in, BuiltInWindowFunction, WindowFunction},
PhysicalExpr, WindowExpr,
};
use crate::scalar::ScalarValue;
use arrow::datatypes::Schema;
use datafusion_expr::WindowFrame;
+use datafusion_physical_expr::window::built_in_window_function_expr::BuiltInWindowFunctionExpr;
use std::convert::TryInto;
-use std::ops::Range;
use std::sync::Arc;
mod aggregate;
@@ -153,23 +150,6 @@ fn create_built_in_window_expr(
})
}
-/// Given a partition range, and the full list of sort partition points, given that the sort
-/// partition points are sorted using [partition columns..., order columns...], the split
-/// boundaries would align (what's sorted on [partition columns...] would definitely be sorted
-/// on finer columns), so this will use binary search to find ranges that are within the
-/// partition range and return the valid slice.
-pub(crate) fn find_ranges_in_range<'a>(
- partition_range: &Range<usize>,
- sort_partition_points: &'a [Range<usize>],
-) -> &'a [Range<usize>] {
- let start_idx = sort_partition_points
- .partition_point(|sort_range| sort_range.start < partition_range.start);
- let end_idx = start_idx
- + sort_partition_points[start_idx..]
- .partition_point(|sort_range| sort_range.end <= partition_range.end);
- &sort_partition_points[start_idx..end_idx]
-}
-
#[cfg(test)]
mod tests {
use super::*;