You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/01 02:09:22 UTC

[GitHub] [arrow-datafusion] liukun4515 opened a new pull request #1387: Add new framework expr coercion

liukun4515 opened a new pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   related #1356
   
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r760561390



##########
File path: datafusion/src/execution/context.rs
##########
@@ -2058,7 +2058,7 @@ mod tests {
         .await
         .unwrap_err();
 
-        assert_eq!(results.to_string(), "Error during planning: Coercion from [Timestamp(Nanosecond, None)] to the signature Uniform(1, [Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64]) failed.");
+        assert_eq!(results.to_string(), "Error during planning: The function Sum do not support the Timestamp(Nanosecond, None).");

Review comment:
       Would it be possible to add the valid signatures into this error message? The new wording is more readable, but we did lose some information about what type signatures are valid

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",

Review comment:
       ```suggestion
                   return Err(DataFusionError::Plan(format!("The function {:?} expects {:?} arguments, but {:?} were provided",
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {

Review comment:
       I think min and max should only have a single input argument (`input_types.len()` is zero) so this is probably ok. I suggest an explicit check like `assert_eq!(input_types.len(), 1)` in addition as a defensive check

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(

Review comment:
       I think some docstrings would help here (perhaps a pointer to the module level documentation as is done in `type_coercion.rs`)

##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -262,6 +266,131 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
 mod tests {
     use super::*;
     use crate::error::Result;
+    use crate::physical_plan::expressions::{ApproxDistinct, ArrayAgg, Count, Max, Min};
+
+    #[test]
+    fn test_count_arragg_approx_expr() -> Result<()> {
+        let funcs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+        ];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Count => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Count>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ApproxDistinct => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ApproxDistinct>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, false),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ArrayAgg => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ArrayAgg>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new(
+                                "c1",
+                                DataType::List(Box::new(Field::new(
+                                    "item",
+                                    data_type.clone(),
+                                    true
+                                ))),
+                                false
+                            ),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_min_max_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Min, AggregateFunction::Max];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Min => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Min>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::Max => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Max>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_sum_avg_expr() -> Result<()> {
+        // TODO

Review comment:
       Do you intend to complete this `TODO` in this PR?

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::physical_plan::aggregates;
+    use crate::physical_plan::aggregates::{signature, AggregateFunction};
+    use crate::physical_plan::coercion_rule::aggregate_rule::coerce_types;
+    use arrow::datatypes::DataType;
+
+    #[test]
+    fn test_aggregate_coerce_types() {
+        // test input args with error number input types
+        let fun = AggregateFunction::Min;
+        let input_types = vec![DataType::Int64, DataType::Int32];
+        let signature = signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!("Error during planning: The function Min expect argument number is 1, but the input argument number is 2", result.unwrap_err().to_string());
+
+        // test input args is invalid data type for sum or avg
+        let fun = AggregateFunction::Sum;
+        let input_types = vec![DataType::Utf8];
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Sum do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+        let fun = AggregateFunction::Avg;
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Avg do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+
+        // test count, array_agg, approx_distinct, min, max.
+        // the coerced types is same with input types
+        let funs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+            AggregateFunction::Min,
+            AggregateFunction::Max,
+        ];
+        let input_types = vec![
+            vec![DataType::Int32],
+            // vec![DataType::Decimal(10, 2)],

Review comment:
       Did you mean to leave this commented out?

##########
File path: datafusion/tests/sql.rs
##########
@@ -5612,10 +5612,9 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx).await?;
     let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100";
-    let logical_plan = ctx.create_logical_plan(sql)?;
-    let physical_plan = ctx.create_physical_plan(&logical_plan).await;
-    let err = physical_plan.unwrap_err();
-    assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
+    let logical_plan = ctx.create_logical_plan(sql);
+    let err = logical_plan.unwrap_err();
+    assert_eq!(err.to_string(), DataFusionError::Plan("The function Count expect argument number is 1, but the input argument number is 0".to_string()).to_string());

Review comment:
       That is a nicer error message for sure 👍 

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()

Review comment:
       You might be able to avoid some clones here by using `into_iter()` and `zip` https://doc.rust-lang.org/std/iter/struct.Zip.html
   
   Something like (untested):
   ```suggestion
       input_exprs
           .iter().zip(coerced_types.into_iter())
           .map(|(expr, coerced_type)| try_cast(expr.clone(), schema, coerced_type))
           .collect::<Result<Vec<_>>>()
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",

Review comment:
       ```suggestion
                       "The function {:?} does not support inputs of type {:?}.",
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));

Review comment:
       ```suggestion
           _ => {
               return Err(DataFusionError::Internal(format!(
                   "Aggregate functions only support uniform signatures. Got {:?}",
                   signature
               )));
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported

Review comment:
       that is a good TODO. 

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",

Review comment:
       ```suggestion
                       "The function {:?} does not support inputs of type {:?}.",
   ```

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::physical_plan::aggregates;
+    use crate::physical_plan::aggregates::{signature, AggregateFunction};
+    use crate::physical_plan::coercion_rule::aggregate_rule::coerce_types;
+    use arrow::datatypes::DataType;
+
+    #[test]
+    fn test_aggregate_coerce_types() {
+        // test input args with error number input types
+        let fun = AggregateFunction::Min;
+        let input_types = vec![DataType::Int64, DataType::Int32];
+        let signature = signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!("Error during planning: The function Min expect argument number is 1, but the input argument number is 2", result.unwrap_err().to_string());
+
+        // test input args is invalid data type for sum or avg
+        let fun = AggregateFunction::Sum;
+        let input_types = vec![DataType::Utf8];
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Sum do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+        let fun = AggregateFunction::Avg;
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Avg do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+
+        // test count, array_agg, approx_distinct, min, max.
+        // the coerced types is same with input types
+        let funs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+            AggregateFunction::Min,
+            AggregateFunction::Max,
+        ];
+        let input_types = vec![
+            vec![DataType::Int32],
+            // vec![DataType::Decimal(10, 2)],
+            vec![DataType::Utf8],
+        ];
+        for fun in funs {
+            for input_type in &input_types {
+                let signature = aggregates::signature(&fun);
+                let result = coerce_types(&fun, input_type, &signature);
+                assert_eq!(*input_type, result.unwrap());
+            }
+        }
+        // test sum, avg
+        let funs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
+        let input_types = vec![
+            vec![DataType::Int32],
+            vec![DataType::Float32],
+            // vec![DataType::Decimal(20, 3)],

Review comment:
       likewise, here I think should work now

##########
File path: datafusion/src/physical_plan/expressions/average.rs
##########
@@ -62,6 +62,24 @@ pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
     }
 }
 
+pub(crate) fn is_avg_support_arg_type(arg_type: &DataType) -> bool {
+    // TODO support the interval
+    // TODO: do we need to support the unsigned data type?

Review comment:
       What do you mean by `the unsigned data type`? If you mean "do we support AVG(UInt8)" I think the answer is yes.

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -60,6 +60,7 @@ pub mod helpers {
 
 pub use approx_distinct::ApproxDistinct;
 pub use array_agg::ArrayAgg;
+pub(crate) use average::is_avg_support_arg_type;

Review comment:
       I don't think we need to export this function (aka no need to declare it `pub crate` in this function). Likewise below

##########
File path: datafusion/src/physical_plan/coercion_rule/mod.rs
##########
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! define the coercion rule for different Expr type

Review comment:
       ```suggestion
   //! define coercion rules for aggregate functions
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 edited a comment on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 edited a comment on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-987824947


   @alamb  PTAL 
   It can be merged if it is looking good to you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-985120685


   > 
   @xudong963 I will begin to address the comments from next week.
   This pull request is not urgent.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763649897



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -87,35 +88,38 @@ impl FromStr for AggregateFunction {
                 return Err(DataFusionError::Plan(format!(
                     "There is no built-in function named {}",
                     name
-                )))
+                )));
             }
         })
     }
 }
 
-/// Returns the datatype of the aggregation function
+/// Returns the datatype of the aggregate function.
+/// This is used to get the returned data type for aggregate expr.
 pub fn return_type(
     fun: &AggregateFunction,
     input_expr_types: &[DataType],
 ) -> Result<DataType> {
     // Note that this function *must* return the same type that the respective physical expression returns
     // or the execution panics.
 
-    // verify that this is a valid set of data types for this function
-    data_types(input_expr_types, &signature(fun))?;
+    let coerced_data_types = coerce_types(fun, input_expr_types, &signature(fun))?;
 
     match fun {
+        // TODO If the datafusion is compatible with PostgreSQL, the returned data type should be INT64.

Review comment:
       https://github.com/apache/arrow-datafusion/issues/1379
   Using the `UINT64` as the result type of `COUNT AGG` is in many places, this will be resolved in https://github.com/apache/arrow-datafusion/issues/1379




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r764400376



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -262,6 +266,199 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
 mod tests {
     use super::*;
     use crate::error::Result;
+    use crate::physical_plan::expressions::{
+        ApproxDistinct, ArrayAgg, Avg, Count, Max, Min, Sum,
+    };
+
+    #[test]
+    fn test_count_arragg_approx_expr() -> Result<()> {
+        let funcs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+        ];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Count => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Count>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ApproxDistinct => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ApproxDistinct>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, false),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ArrayAgg => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ArrayAgg>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new(
+                                "c1",
+                                DataType::List(Box::new(Field::new(
+                                    "item",
+                                    data_type.clone(),
+                                    true
+                                ))),
+                                false
+                            ),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_min_max_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Min, AggregateFunction::Max];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Min => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Min>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::Max => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Max>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_sum_avg_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::UInt64,
+            DataType::Int32,
+            DataType::Int64,
+            DataType::Float32,
+            DataType::Float64,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Sum => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Sum>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        let mut expect_type = data_type.clone();
+                        if matches!(
+                            data_type,
+                            DataType::UInt8
+                                | DataType::UInt16
+                                | DataType::UInt32
+                                | DataType::UInt64
+                        ) {
+                            expect_type = DataType::UInt64;
+                        } else if matches!(
+                            data_type,
+                            DataType::Int8
+                                | DataType::Int16
+                                | DataType::Int32
+                                | DataType::Int64
+                        ) {
+                            expect_type = DataType::Int64;
+                        } else if matches!(
+                            data_type,
+                            DataType::Float32 | DataType::Float64
+                        ) {
+                            expect_type = data_type.clone();
+                        }

Review comment:
       Just FYI you can write this kind of logic in a more concise way with something like (untested and abbreviated)
   
   ```rust
   let expect_type = match (data_type) {
     DataType::UInt8 | .... => DataType::UInt64,
     DataType::Int8 | .... => DataType::Int64,
     _ => data_type.clone()
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] xudong963 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
xudong963 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r762540770



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -87,35 +88,38 @@ impl FromStr for AggregateFunction {
                 return Err(DataFusionError::Plan(format!(
                     "There is no built-in function named {}",
                     name
-                )))
+                )));
             }
         })
     }
 }
 
-/// Returns the datatype of the aggregation function
+/// Returns the datatype of the aggregate function.
+/// This is used to get the returned data type for aggregate expr.
 pub fn return_type(
     fun: &AggregateFunction,
     input_expr_types: &[DataType],
 ) -> Result<DataType> {
     // Note that this function *must* return the same type that the respective physical expression returns
     // or the execution panics.
 
-    // verify that this is a valid set of data types for this function
-    data_types(input_expr_types, &signature(fun))?;
+    let coerced_data_types = coerce_types(fun, input_expr_types, &signature(fun))?;
 
     match fun {
+        // TODO If the datafusion is compatible with PostgreSQL, the returned data type should be INT64.

Review comment:
       Maybe we can fix this to keep the same as postgres?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-987824947


   @alamb  PTAL 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] xudong963 commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
xudong963 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-984725370


   Thanks for your work! @liukun4515. If possible(the ticket doesn't be merged), I'll take a look at the weekend, because these days I am so busy with my job demands 🤦‍♂️, work 12 + hours per day ......


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-984244370


   Thank you for your review and grammar suggestions.
   I will address them later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763692008



##########
File path: datafusion/src/physical_plan/expressions/average.rs
##########
@@ -62,6 +62,24 @@ pub fn avg_return_type(arg_type: &DataType) -> Result<DataType> {
     }
 }
 
+pub(crate) fn is_avg_support_arg_type(arg_type: &DataType) -> bool {
+    // TODO support the interval
+    // TODO: do we need to support the unsigned data type?

Review comment:
       got it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-987376304


   @xudong963  and @liukun4515  what is the status of this PR? Shall we merge it? Is it waiting on more review / work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763690651



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported

Review comment:
       Maybe we can add the checker in the follow-up pull request.
   In this pull request, we just make consistent with before logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763703628



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",

Review comment:
       good catch

##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",

Review comment:
       good catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763714666



##########
File path: datafusion/src/execution/context.rs
##########
@@ -2058,7 +2058,7 @@ mod tests {
         .await
         .unwrap_err();
 
-        assert_eq!(results.to_string(), "Error during planning: Coercion from [Timestamp(Nanosecond, None)] to the signature Uniform(1, [Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64]) failed.");
+        assert_eq!(results.to_string(), "Error during planning: The function Sum do not support the Timestamp(Nanosecond, None).");

Review comment:
       In some SQL systems, if we input the incompatible datatype, they just throw the error or except and don't give the compatible data type.
   We can refine this later if the supported data type is necessary for the user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r760770579



##########
File path: datafusion/src/execution/context.rs
##########
@@ -2058,7 +2058,7 @@ mod tests {
         .await
         .unwrap_err();
 
-        assert_eq!(results.to_string(), "Error during planning: Coercion from [Timestamp(Nanosecond, None)] to the signature Uniform(1, [Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64]) failed.");
+        assert_eq!(results.to_string(), "Error during planning: The function Sum do not support the Timestamp(Nanosecond, None).");

Review comment:
       I agree with this :+1: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763691880



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::physical_plan::aggregates;
+    use crate::physical_plan::aggregates::{signature, AggregateFunction};
+    use crate::physical_plan::coercion_rule::aggregate_rule::coerce_types;
+    use arrow::datatypes::DataType;
+
+    #[test]
+    fn test_aggregate_coerce_types() {
+        // test input args with error number input types
+        let fun = AggregateFunction::Min;
+        let input_types = vec![DataType::Int64, DataType::Int32];
+        let signature = signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!("Error during planning: The function Min expect argument number is 1, but the input argument number is 2", result.unwrap_err().to_string());
+
+        // test input args is invalid data type for sum or avg
+        let fun = AggregateFunction::Sum;
+        let input_types = vec![DataType::Utf8];
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Sum do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+        let fun = AggregateFunction::Avg;
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Avg do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+
+        // test count, array_agg, approx_distinct, min, max.
+        // the coerced types is same with input types
+        let funs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+            AggregateFunction::Min,
+            AggregateFunction::Max,
+        ];
+        let input_types = vec![
+            vec![DataType::Int32],
+            // vec![DataType::Decimal(10, 2)],
+            vec![DataType::Utf8],
+        ];
+        for fun in funs {
+            for input_type in &input_types {
+                let signature = aggregates::signature(&fun);
+                let result = coerce_types(&fun, input_type, &signature);
+                assert_eq!(*input_type, result.unwrap());
+            }
+        }
+        // test sum, avg
+        let funs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
+        let input_types = vec![
+            vec![DataType::Int32],
+            vec![DataType::Float32],
+            // vec![DataType::Decimal(20, 3)],

Review comment:
       I will support the decimal data type for SUM/AVG function in the feature https://github.com/apache/arrow-datafusion/pull/1408.
   I have added the TODO to mark it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763728811



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()

Review comment:
       great suggestions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763776025



##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -60,6 +60,7 @@ pub mod helpers {
 
 pub use approx_distinct::ApproxDistinct;
 pub use array_agg::ArrayAgg;
+pub(crate) use average::is_avg_support_arg_type;

Review comment:
       @alamb 
   But `is_avg_support_arg_type` will be used in other model, for example `coercion_rule` model.
   https://github.com/apache/arrow-datafusion/blob/42b219215f1962bbf529c06392f5c7c978f70cc4/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs#L75




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763702503



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));

Review comment:
       Now agg accept this `uniform` and `any`.
   I change the message to `Aggregate functions do not support this {:?}`,`
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763720460



##########
File path: datafusion/tests/sql.rs
##########
@@ -5612,10 +5612,9 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> {
     let mut ctx = ExecutionContext::new();
     register_aggregate_csv(&mut ctx).await?;
     let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100";
-    let logical_plan = ctx.create_logical_plan(sql)?;
-    let physical_plan = ctx.create_physical_plan(&logical_plan).await;
-    let err = physical_plan.unwrap_err();
-    assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'");
+    let logical_plan = ctx.create_logical_plan(sql);
+    let err = logical_plan.unwrap_err();
+    assert_eq!(err.to_string(), DataFusionError::Plan("The function Count expect argument number is 1, but the input argument number is 0".to_string()).to_string());

Review comment:
       It will be changed to `Error during planning: The function Count expects 1 arguments, but 0 were provided`
   After this comments https://github.com/apache/arrow-datafusion/pull/1387/files#r760575015




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r764459996



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -262,6 +266,199 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
 mod tests {
     use super::*;
     use crate::error::Result;
+    use crate::physical_plan::expressions::{
+        ApproxDistinct, ArrayAgg, Avg, Count, Max, Min, Sum,
+    };
+
+    #[test]
+    fn test_count_arragg_approx_expr() -> Result<()> {
+        let funcs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+        ];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Count => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Count>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ApproxDistinct => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ApproxDistinct>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, false),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ArrayAgg => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ArrayAgg>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new(
+                                "c1",
+                                DataType::List(Box::new(Field::new(
+                                    "item",
+                                    data_type.clone(),
+                                    true
+                                ))),
+                                false
+                            ),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_min_max_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Min, AggregateFunction::Max];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Min => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Min>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::Max => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Max>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_sum_avg_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::UInt64,
+            DataType::Int32,
+            DataType::Int64,
+            DataType::Float32,
+            DataType::Float64,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Sum => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Sum>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        let mut expect_type = data_type.clone();
+                        if matches!(
+                            data_type,
+                            DataType::UInt8
+                                | DataType::UInt16
+                                | DataType::UInt32
+                                | DataType::UInt64
+                        ) {
+                            expect_type = DataType::UInt64;
+                        } else if matches!(
+                            data_type,
+                            DataType::Int8
+                                | DataType::Int16
+                                | DataType::Int32
+                                | DataType::Int64
+                        ) {
+                            expect_type = DataType::Int64;
+                        } else if matches!(
+                            data_type,
+                            DataType::Float32 | DataType::Float64
+                        ) {
+                            expect_type = data_type.clone();
+                        }

Review comment:
       good style and suggestion!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-984642123


   > I can remain this pull request for a while and don't merge it quickly.
   
   I think that is a good idea @liukun4515  -- though the danger of doing so is that we will end up with merge conflicts
   
   Perhaps @Dandandan  @houqp  @rdettai or @Jimexist  would like to weigh in on the structure and direction as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-984260169


   > but overall I think this is looking almost ready to merge
   
   
   
   > Thanks @liukun4515 I really like where this is heading and it makes sense to me to have aggregate coercion rules that are separate from coercion rules for other functions.
   > 
   > I am curious how you see the overall code structure evolving. Is your idea that there will be several coercion modules, for each different Expr type, for example?
   > 
   > ```
   > datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
   > datafusion/src/physical_plan/coercion_rule/function.rs
   > datafusion/src/physical_plan/coercion_rule/operators.rs
   > ```
   
   Yes, this is the future result and diff type exprs have diff  coercion rule in diff file.
   The code structure will look like the example your provided.
   
   > I had a few suggestions / comments, but overall I think this is looking almost ready to merge
   > 
   > Thank you for starting this work
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-988301837


   Thanks also to @xudong963  for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] houqp commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-988548289


   Thank you @liukun4515 for your contribution on this :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763698381



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",

Review comment:
       good suggestion.
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763706078



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {
+        DataType::Dictionary(_, dict_value_type) => {
+            // TODO add checker, if the value type is complex data type
+            Ok(vec![dict_value_type.deref().clone()])
+        }
+        // TODO add checker for datatype which min and max supported
+        // For example, the `Struct` and `Map` type are not supported in the MIN and MAX function
+        _ => Ok(input_types.to_vec()),
+    }
+}
+
+pub fn coerce_exprs(
+    agg_fun: &AggregateFunction,
+    input_exprs: &[Arc<dyn PhysicalExpr>],
+    schema: &Schema,
+    signature: &Signature,
+) -> Result<Vec<Arc<dyn PhysicalExpr>>> {
+    if input_exprs.is_empty() {
+        return Ok(vec![]);
+    }
+    let input_types = input_exprs
+        .iter()
+        .map(|e| e.data_type(schema))
+        .collect::<Result<Vec<_>>>()?;
+
+    // get the coerced data types
+    let coerced_types = coerce_types(agg_fun, &input_types, signature)?;
+
+    // try cast if need
+    input_exprs
+        .iter()
+        .enumerate()
+        .map(|(i, expr)| try_cast(expr.clone(), schema, coerced_types[i].clone()))
+        .collect::<Result<Vec<_>>>()
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::physical_plan::aggregates;
+    use crate::physical_plan::aggregates::{signature, AggregateFunction};
+    use crate::physical_plan::coercion_rule::aggregate_rule::coerce_types;
+    use arrow::datatypes::DataType;
+
+    #[test]
+    fn test_aggregate_coerce_types() {
+        // test input args with error number input types
+        let fun = AggregateFunction::Min;
+        let input_types = vec![DataType::Int64, DataType::Int32];
+        let signature = signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!("Error during planning: The function Min expect argument number is 1, but the input argument number is 2", result.unwrap_err().to_string());
+
+        // test input args is invalid data type for sum or avg
+        let fun = AggregateFunction::Sum;
+        let input_types = vec![DataType::Utf8];
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Sum do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+        let fun = AggregateFunction::Avg;
+        let signature = aggregates::signature(&fun);
+        let result = coerce_types(&fun, &input_types, &signature);
+        assert_eq!(
+            "Error during planning: The function Avg do not support the Utf8.",
+            result.unwrap_err().to_string()
+        );
+
+        // test count, array_agg, approx_distinct, min, max.
+        // the coerced types is same with input types
+        let funs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+            AggregateFunction::Min,
+            AggregateFunction::Max,
+        ];
+        let input_types = vec![
+            vec![DataType::Int32],
+            // vec![DataType::Decimal(10, 2)],

Review comment:
       Yes, the min and max does not support the decimal datatype before merging this https://github.com/apache/arrow-datafusion/pull/1407.
   I will add the TODO to mark it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763776025



##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -60,6 +60,7 @@ pub mod helpers {
 
 pub use approx_distinct::ApproxDistinct;
 pub use array_agg::ArrayAgg;
+pub(crate) use average::is_avg_support_arg_type;

Review comment:
       But `is_avg_support_arg_type` will be used in other model, for example `coercion_rule` model.
   https://github.com/apache/arrow-datafusion/blob/42b219215f1962bbf529c06392f5c7c978f70cc4/datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs#L75




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763689688



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(
+    agg_fun: &AggregateFunction,
+    input_types: &[DataType],
+    signature: &Signature,
+) -> Result<Vec<DataType>> {
+    match signature.type_signature {
+        TypeSignature::Uniform(agg_count, _) | TypeSignature::Any(agg_count) => {
+            if input_types.len() != agg_count {
+                return Err(DataFusionError::Plan(format!("The function {:?} expect argument number is {:?}, but the input argument number is {:?}",
+                                                         agg_fun, agg_count, input_types.len())));
+            }
+        }
+        _ => {
+            return Err(DataFusionError::Plan(format!(
+                "The aggregate coercion rule don't support this {:?}",
+                signature
+            )));
+        }
+    };
+    match agg_fun {
+        AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::ArrayAgg => Ok(input_types.to_vec()),
+        AggregateFunction::Min | AggregateFunction::Max => {
+            // min and max support the dictionary data type
+            // unpack the dictionary to get the value
+            get_min_max_result_type(input_types)
+        }
+        AggregateFunction::Sum => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval.
+            if !is_sum_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+        AggregateFunction::Avg => {
+            // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc
+            // smallint, int, bigint, real, double precision, decimal, or interval
+            if !is_avg_support_arg_type(&input_types[0]) {
+                return Err(DataFusionError::Plan(format!(
+                    "The function {:?} do not support the {:?}.",
+                    agg_fun, input_types[0]
+                )));
+            }
+            Ok(input_types.to_vec())
+        }
+    }
+}
+
+fn get_min_max_result_type(input_types: &[DataType]) -> Result<Vec<DataType>> {
+    // min and max support the dictionary data type
+    // unpack the dictionary to get the value
+    match &input_types[0] {

Review comment:
       Done, add the assert_Eq




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r764404370



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -262,6 +266,199 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
 mod tests {
     use super::*;
     use crate::error::Result;
+    use crate::physical_plan::expressions::{
+        ApproxDistinct, ArrayAgg, Avg, Count, Max, Min, Sum,
+    };
+
+    #[test]
+    fn test_count_arragg_approx_expr() -> Result<()> {
+        let funcs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+        ];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Count => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Count>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ApproxDistinct => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ApproxDistinct>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, false),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ArrayAgg => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ArrayAgg>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new(
+                                "c1",
+                                DataType::List(Box::new(Field::new(
+                                    "item",
+                                    data_type.clone(),
+                                    true
+                                ))),
+                                false
+                            ),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_min_max_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Min, AggregateFunction::Max];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Min => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Min>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::Max => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Max>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_sum_avg_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Sum, AggregateFunction::Avg];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::UInt64,
+            DataType::Int32,
+            DataType::Int64,
+            DataType::Float32,
+            DataType::Float64,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Sum => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Sum>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        let mut expect_type = data_type.clone();
+                        if matches!(
+                            data_type,
+                            DataType::UInt8
+                                | DataType::UInt16
+                                | DataType::UInt32
+                                | DataType::UInt64
+                        ) {
+                            expect_type = DataType::UInt64;
+                        } else if matches!(
+                            data_type,
+                            DataType::Int8
+                                | DataType::Int16
+                                | DataType::Int32
+                                | DataType::Int64
+                        ) {
+                            expect_type = DataType::Int64;
+                        } else if matches!(
+                            data_type,
+                            DataType::Float32 | DataType::Float64
+                        ) {
+                            expect_type = data_type.clone();
+                        }

Review comment:
       https://github.com/apache/arrow-datafusion/pull/1416 <-- PR wth proposed cleanup




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763650471



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -262,6 +266,131 @@ pub fn signature(fun: &AggregateFunction) -> Signature {
 mod tests {
     use super::*;
     use crate::error::Result;
+    use crate::physical_plan::expressions::{ApproxDistinct, ArrayAgg, Count, Max, Min};
+
+    #[test]
+    fn test_count_arragg_approx_expr() -> Result<()> {
+        let funcs = vec![
+            AggregateFunction::Count,
+            AggregateFunction::ArrayAgg,
+            AggregateFunction::ApproxDistinct,
+        ];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Count => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Count>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ApproxDistinct => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ApproxDistinct>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", DataType::UInt64, false),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::ArrayAgg => {
+                        assert!(result_agg_phy_exprs.as_any().is::<ArrayAgg>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new(
+                                "c1",
+                                DataType::List(Box::new(Field::new(
+                                    "item",
+                                    data_type.clone(),
+                                    true
+                                ))),
+                                false
+                            ),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_min_max_expr() -> Result<()> {
+        let funcs = vec![AggregateFunction::Min, AggregateFunction::Max];
+        let data_types = vec![
+            DataType::UInt32,
+            DataType::Int32,
+            DataType::Float32,
+            DataType::Float64,
+            DataType::Decimal(10, 2),
+            DataType::Utf8,
+        ];
+        for fun in funcs {
+            for data_type in &data_types {
+                let input_schema =
+                    Schema::new(vec![Field::new("c1", data_type.clone(), true)]);
+                let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![Arc::new(
+                    expressions::Column::new_with_schema("c1", &input_schema).unwrap(),
+                )];
+                let result_agg_phy_exprs = create_aggregate_expr(
+                    &fun,
+                    false,
+                    &input_phy_exprs[0..1],
+                    &input_schema,
+                    "c1",
+                )?;
+                match fun {
+                    AggregateFunction::Min => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Min>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    AggregateFunction::Max => {
+                        assert!(result_agg_phy_exprs.as_any().is::<Max>());
+                        assert_eq!("c1", result_agg_phy_exprs.name());
+                        assert_eq!(
+                            Field::new("c1", data_type.clone(), true),
+                            result_agg_phy_exprs.field().unwrap()
+                        );
+                    }
+                    _ => {}
+                };
+            }
+        }
+        Ok(())
+    }
+
+    #[test]
+    fn test_sum_avg_expr() -> Result<()> {
+        // TODO

Review comment:
       Yes, I have filled sum/avg test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-987511968


   > @xudong963 and @liukun4515 what is the status of this PR? Shall we merge it? Is it waiting on more review / work?
   
   @alamb 
   I'm waiting for review comments these days.
   But from today, I will address all comments.
   After comments are addressed, I will ask someone to re-review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#discussion_r763797242



##########
File path: datafusion/src/physical_plan/coercion_rule/aggregate_rule.rs
##########
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Support the coercion rule for aggregate function.
+
+use crate::arrow::datatypes::Schema;
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::aggregates::AggregateFunction;
+use crate::physical_plan::expressions::{
+    is_avg_support_arg_type, is_sum_support_arg_type, try_cast,
+};
+use crate::physical_plan::functions::{Signature, TypeSignature};
+use crate::physical_plan::PhysicalExpr;
+use arrow::datatypes::DataType;
+use std::ops::Deref;
+use std::sync::Arc;
+
+pub fn coerce_types(

Review comment:
       Yes, I have added the doc for these two functions.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1387: refactor the expr coercion[aggregate expr function]

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-983383594


   @alamb  PTAL
   In the #1356, we reach the same  idea to refactor the coercion framework and split the coercion rules to a single module.
   This is the part for type coercion, and just for aggregate function.
   If this approach is fine for you and the community, we can continue other refactoring work, such as scalar function expr, udf, UDAF.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on pull request #1387: Add coercion rules for AggregateFunctions

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on pull request #1387:
URL: https://github.com/apache/arrow-datafusion/pull/1387#issuecomment-984239621


   @alamb I wish this refactor can involved many community developer or committer, and other developer can know this context.
   I can remain this pull request for a while and don't merge it quickly.
   What about your opinion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org