You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/28 13:01:50 UTC
(arrow-datafusion) branch main updated: Properly encode STRING_AGG, NTH_VALUE in physical plan protobufs (#9027)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 51b8982888 Properly encode STRING_AGG, NTH_VALUE in physical plan protobufs (#9027)
51b8982888 is described below
commit 51b898288830c224b825523f9be1d54974f15d2f
Author: Sean Smith <sc...@gmail.com>
AuthorDate: Sun Jan 28 07:01:44 2024 -0600
Properly encode STRING_AGG, NTH_VALUE in physical plan protobufs (#9027)
* Properly encode STRING_AGG in physical plan protobufs
* reference issue for nth_value
---
datafusion/proto/src/physical_plan/to_proto.rs | 10 ++--
.../proto/tests/cases/roundtrip_physical_plan.rs | 55 ++++++++++++++++------
2 files changed, 48 insertions(+), 17 deletions(-)
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index f4e3f9e4dc..cff32ca2f8 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -49,9 +49,9 @@ use datafusion::physical_plan::expressions::{
CastExpr, Column, Correlation, Count, Covariance, CovariancePop, CumeDist,
DistinctArrayAgg, DistinctBitXor, DistinctCount, DistinctSum, FirstValue, Grouping,
InListExpr, IsNotNullExpr, IsNullExpr, LastValue, LikeExpr, Literal, Max, Median,
- Min, NegativeExpr, NotExpr, NthValue, Ntile, OrderSensitiveArrayAgg, Rank, RankType,
- Regr, RegrType, RowNumber, Stddev, StddevPop, Sum, TryCastExpr, Variance,
- VariancePop, WindowShift,
+ Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg,
+ Rank, RankType, Regr, RegrType, RowNumber, Stddev, StddevPop, StringAgg, Sum,
+ TryCastExpr, Variance, VariancePop, WindowShift,
};
use datafusion::physical_plan::udaf::AggregateFunctionExpr;
use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr};
@@ -363,6 +363,10 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result<AggrFn> {
protobuf::AggregateFunction::FirstValueAgg
} else if aggr_expr.downcast_ref::<LastValue>().is_some() {
protobuf::AggregateFunction::LastValueAgg
+ } else if aggr_expr.downcast_ref::<StringAgg>().is_some() {
+ protobuf::AggregateFunction::StringAgg
+ } else if aggr_expr.downcast_ref::<NthValueAgg>().is_some() {
+ protobuf::AggregateFunction::NthValueAgg
} else {
return not_impl_err!("Aggregate function not supported: {expr:?}");
};
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 9a95e103c2..38eb390003 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -44,7 +44,8 @@ use datafusion::physical_plan::analyze::AnalyzeExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::{
binary, cast, col, in_list, like, lit, Avg, BinaryExpr, Column, DistinctCount,
- GetFieldAccessExpr, GetIndexedFieldExpr, NotExpr, NthValue, PhysicalSortExpr, Sum,
+ GetFieldAccessExpr, GetIndexedFieldExpr, NotExpr, NthValue, PhysicalSortExpr,
+ StringAgg, Sum,
};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::insert::FileSinkExec;
@@ -328,20 +329,46 @@ fn rountrip_aggregate() -> Result<()> {
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("a", &schema)?, "unused".to_string())];
- let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
- cast(col("b", &schema)?, &schema, DataType::Float64)?,
- "AVG(b)".to_string(),
- DataType::Float64,
- ))];
+ let test_cases: Vec<Vec<Arc<dyn AggregateExpr>>> = vec![
+ // AVG
+ vec![Arc::new(Avg::new(
+ cast(col("b", &schema)?, &schema, DataType::Float64)?,
+ "AVG(b)".to_string(),
+ DataType::Float64,
+ ))],
+ // TODO: See <https://github.com/apache/arrow-datafusion/issues/9028>
+ // // NTH_VALUE
+ // vec![Arc::new(NthValueAgg::new(
+ // col("b", &schema)?,
+ // 1,
+ // "NTH_VALUE(b, 1)".to_string(),
+ // DataType::Int64,
+ // false,
+ // Vec::new(),
+ // Vec::new(),
+ // ))],
+ // STRING_AGG
+ vec![Arc::new(StringAgg::new(
+ cast(col("b", &schema)?, &schema, DataType::Utf8)?,
+ lit(ScalarValue::Utf8(Some(",".to_string()))),
+ "STRING_AGG(name, ',')".to_string(),
+ DataType::Utf8,
+ ))],
+ ];
- roundtrip_test(Arc::new(AggregateExec::try_new(
- AggregateMode::Final,
- PhysicalGroupBy::new_single(groups.clone()),
- aggregates.clone(),
- vec![None],
- Arc::new(EmptyExec::new(schema.clone())),
- schema,
- )?))
+ for aggregates in test_cases {
+ let schema = schema.clone();
+ roundtrip_test(Arc::new(AggregateExec::try_new(
+ AggregateMode::Final,
+ PhysicalGroupBy::new_single(groups.clone()),
+ aggregates,
+ vec![None],
+ Arc::new(EmptyExec::new(schema.clone())),
+ schema,
+ )?))?;
+ }
+
+ Ok(())
}
#[test]