You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/03 07:10:27 UTC

[GitHub] [beam] robinyqiu opened a new pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

robinyqiu opened a new pull request #12766:
URL: https://github.com/apache/beam/pull/12766


   
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   ![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg)
   ![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] closed pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
stale[bot] closed pull request #12766:
URL: https://github.com/apache/beam/pull/12766


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12766:
URL: https://github.com/apache/beam/pull/12766#discussion_r487307937



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #12766:
URL: https://github.com/apache/beam/pull/12766#issuecomment-734494788


   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] stale[bot] commented on pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #12766:
URL: https://github.com/apache/beam/pull/12766#issuecomment-727715781


   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12766:
URL: https://github.com/apache/beam/pull/12766#discussion_r487307937



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12766:
URL: https://github.com/apache/beam/pull/12766#discussion_r487308868



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12766:
URL: https://github.com/apache/beam/pull/12766#discussion_r487307937



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
         ZetaSqlCalciteTranslationUtils.toCalciteType(
             computedColumn.getColumn().getType(), nullable, getCluster().getRexBuilder());
 
+    SqlAggFunction sqlAggFunction =
+        getSqlAggFunction(aggregateFunctionCall.getFunction().getName(), returnType);
+
     String aggName = getTrait().resolveAlias(computedColumn.getColumn());
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static SqlAggFunction getSqlAggFunction(
+      String zetaSqlAggFunctionName, RelDataType returnType) {
+    // ZetaSQL specific aggregation functions, implemented with a user-defined CombineFn
+    if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+      return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+    }
+
+    if ("$count_star".equals(zetaSqlAggFunctionName)) {
+      zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to the same implementation
+    } else {
+      // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case function names
+      zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+    }
+
+    // Beam builtin aggregation functions (available in both ZetaSQL and CalciteSQL), implemented in
+    // {@link org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+    if (BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName)) {

Review comment:
       Is there a need to use the upper case of `zetaSqlAggFunctionName`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String name, SqlTypeName returnT
         SqlFunctionCategory.USER_DEFINED_FUNCTION);
   }
 
+  /**
+   * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function name and return type.
+   * These functions will be executed by Beam CombineFns implemented in {@link
+   * org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+   */
+  public static SqlAggFunction createZetaSqlAggFunction(String name, RelDataType returnType) {
+    return new SqlAggFunction(
+        name,
+        null, // sqlIdentifier
+        SqlKind.OTHER_FUNCTION,
+        x -> returnType,

Review comment:
       I am not sure whether this is the right trigger to override the return type of an agg operator. Maybe the best to evaluate it is to support `AVG` in ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when input parameter type is `INT64` for `AVG`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on pull request #12766: [BEAM-10459][BEAM-9514] Fix AssertionError type mismatch from AggregateScanConverter

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on pull request #12766:
URL: https://github.com/apache/beam/pull/12766#issuecomment-686789357


   Tested internally. Got some failures passing and no regression found.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org