You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/01/28 13:01:50 UTC

(arrow-datafusion) branch main updated: Properly encode STRING_AGG, NTH_VALUE in physical plan protobufs (#9027)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 51b8982888 Properly encode STRING_AGG, NTH_VALUE in physical plan protobufs (#9027)
51b8982888 is described below

commit 51b898288830c224b825523f9be1d54974f15d2f
Author: Sean Smith <sc...@gmail.com>
AuthorDate: Sun Jan 28 07:01:44 2024 -0600

    Properly encode STRING_AGG, NTH_VALUE in physical plan protobufs (#9027)
    
    * Properly encode STRING_AGG in physical plan protobufs
    
    * reference issue for nth_value
---
 datafusion/proto/src/physical_plan/to_proto.rs     | 10 ++--
 .../proto/tests/cases/roundtrip_physical_plan.rs   | 55 ++++++++++++++++------
 2 files changed, 48 insertions(+), 17 deletions(-)

diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs
index f4e3f9e4dc..cff32ca2f8 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -49,9 +49,9 @@ use datafusion::physical_plan::expressions::{
     CastExpr, Column, Correlation, Count, Covariance, CovariancePop, CumeDist,
     DistinctArrayAgg, DistinctBitXor, DistinctCount, DistinctSum, FirstValue, Grouping,
     InListExpr, IsNotNullExpr, IsNullExpr, LastValue, LikeExpr, Literal, Max, Median,
-    Min, NegativeExpr, NotExpr, NthValue, Ntile, OrderSensitiveArrayAgg, Rank, RankType,
-    Regr, RegrType, RowNumber, Stddev, StddevPop, Sum, TryCastExpr, Variance,
-    VariancePop, WindowShift,
+    Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg,
+    Rank, RankType, Regr, RegrType, RowNumber, Stddev, StddevPop, StringAgg, Sum,
+    TryCastExpr, Variance, VariancePop, WindowShift,
 };
 use datafusion::physical_plan::udaf::AggregateFunctionExpr;
 use datafusion::physical_plan::windows::{BuiltInWindowExpr, PlainAggregateWindowExpr};
@@ -363,6 +363,10 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result<AggrFn> {
         protobuf::AggregateFunction::FirstValueAgg
     } else if aggr_expr.downcast_ref::<LastValue>().is_some() {
         protobuf::AggregateFunction::LastValueAgg
+    } else if aggr_expr.downcast_ref::<StringAgg>().is_some() {
+        protobuf::AggregateFunction::StringAgg
+    } else if aggr_expr.downcast_ref::<NthValueAgg>().is_some() {
+        protobuf::AggregateFunction::NthValueAgg
     } else {
         return not_impl_err!("Aggregate function not supported: {expr:?}");
     };
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 9a95e103c2..38eb390003 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -44,7 +44,8 @@ use datafusion::physical_plan::analyze::AnalyzeExec;
 use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::expressions::{
     binary, cast, col, in_list, like, lit, Avg, BinaryExpr, Column, DistinctCount,
-    GetFieldAccessExpr, GetIndexedFieldExpr, NotExpr, NthValue, PhysicalSortExpr, Sum,
+    GetFieldAccessExpr, GetIndexedFieldExpr, NotExpr, NthValue, PhysicalSortExpr,
+    StringAgg, Sum,
 };
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::insert::FileSinkExec;
@@ -328,20 +329,46 @@ fn rountrip_aggregate() -> Result<()> {
     let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
         vec![(col("a", &schema)?, "unused".to_string())];
 
-    let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
-        cast(col("b", &schema)?, &schema, DataType::Float64)?,
-        "AVG(b)".to_string(),
-        DataType::Float64,
-    ))];
+    let test_cases: Vec<Vec<Arc<dyn AggregateExpr>>> = vec![
+        // AVG
+        vec![Arc::new(Avg::new(
+            cast(col("b", &schema)?, &schema, DataType::Float64)?,
+            "AVG(b)".to_string(),
+            DataType::Float64,
+        ))],
+        // TODO: See <https://github.com/apache/arrow-datafusion/issues/9028>
+        // // NTH_VALUE
+        // vec![Arc::new(NthValueAgg::new(
+        //     col("b", &schema)?,
+        //     1,
+        //     "NTH_VALUE(b, 1)".to_string(),
+        //     DataType::Int64,
+        //     false,
+        //     Vec::new(),
+        //     Vec::new(),
+        // ))],
+        // STRING_AGG
+        vec![Arc::new(StringAgg::new(
+            cast(col("b", &schema)?, &schema, DataType::Utf8)?,
+            lit(ScalarValue::Utf8(Some(",".to_string()))),
+            "STRING_AGG(name, ',')".to_string(),
+            DataType::Utf8,
+        ))],
+    ];
 
-    roundtrip_test(Arc::new(AggregateExec::try_new(
-        AggregateMode::Final,
-        PhysicalGroupBy::new_single(groups.clone()),
-        aggregates.clone(),
-        vec![None],
-        Arc::new(EmptyExec::new(schema.clone())),
-        schema,
-    )?))
+    for aggregates in test_cases {
+        let schema = schema.clone();
+        roundtrip_test(Arc::new(AggregateExec::try_new(
+            AggregateMode::Final,
+            PhysicalGroupBy::new_single(groups.clone()),
+            aggregates,
+            vec![None],
+            Arc::new(EmptyExec::new(schema.clone())),
+            schema,
+        )?))?;
+    }
+
+    Ok(())
 }
 
 #[test]