You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/04 07:19:41 UTC

[GitHub] [beam] sonam-vend opened a new pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

sonam-vend opened a new pull request #13483:
URL: https://github.com/apache/beam/pull/13483


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r539600655



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -81,6 +72,12 @@
           x -> createTypeFactory().createSqlType(SqlTypeName.VARCHAR),
           new UdafImpl<>(new StringAgg.StringAggString()));
 
+  public static final SqlOperator ARR_AGG_ARR_FN =
+          createUdafOperator(
+                  "array_agg",
+                  x -> createTypeFactory().createSqlType(SqlTypeName.ARRAY),

Review comment:
       But `ARRAY_AGG` should support all element types, not just `INT64`. https://github.com/google/zetasql/blob/master/docs/aggregate_functions.md#array_agg




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sonam-vend commented on a change in pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
sonam-vend commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r539602319



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -81,6 +72,12 @@
           x -> createTypeFactory().createSqlType(SqlTypeName.VARCHAR),
           new UdafImpl<>(new StringAgg.StringAggString()));
 
+  public static final SqlOperator ARR_AGG_ARR_FN =
+          createUdafOperator(
+                  "array_agg",
+                  x -> createTypeFactory().createSqlType(SqlTypeName.ARRAY),

Review comment:
       @ibzib exactly. I try to implement generic array like not sure if it is the correct way.
   
    private static RelDataType relDataType =
             new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
                     .builder()
                     .add("col_tinyint", SqlTypeName.TINYINT)
                     .add("col_smallint", SqlTypeName.SMALLINT)
                     .add("col_integer", SqlTypeName.INTEGER)
                     .add("col_bigint", SqlTypeName.BIGINT)
                     .add("col_float", SqlTypeName.FLOAT)
                     .add("col_double", SqlTypeName.DOUBLE)
                     .add("col_decimal", SqlTypeName.DECIMAL)
                     .add("col_string_varchar", SqlTypeName.VARCHAR)
                     .add("col_time", SqlTypeName.TIME)
                     .add("col_date", SqlTypeName.DATE)
                     .add("col_timestamp_with_local_time_zone", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
                     .add("col_timestamp", SqlTypeName.TIMESTAMP)
                     .add("col_boolean", SqlTypeName.BOOLEAN)
                     .build();
   
     public static final SqlOperator ARR_AGG_ARR_FN =
             createUdafOperator(
                     "array_agg",
                     x -> createTypeFactory().createArrayType(**relDataType**, -1),
                     new UdafImpl<>(new ArrayAgg.ArrayAggArray()));




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r538842532



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
##########
@@ -4067,4 +4060,21 @@ public void testSimpleTableName() {
             Row.withSchema(singleField).addValues(15L).build());
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
+
+  @Test
+  public void testArrayAggregation() {
+    String sql =
+            "SELECT ARRAY_AGG(x) AS array_agg\n" +
+                    "FROM UNNEST([2, 1, -2, 3, -2, 1, 2]) AS x";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema schema = Schema.builder().addArrayField("array_field", FieldType.of(Schema.TypeName.ARRAY)).build();

Review comment:
       I think here we expect schema:
   
   ```
   Schema schema = Schema.builder().addArrayField("array_field", FieldType.of(FieldType.INT64)).build();
   ```
   
   meaning it is an "array of int64"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu merged pull request #13483: [BEAM-11800] Support ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
robinyqiu merged pull request #13483:
URL: https://github.com/apache/beam/pull/13483


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r538849960



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -81,6 +72,12 @@
           x -> createTypeFactory().createSqlType(SqlTypeName.VARCHAR),
           new UdafImpl<>(new StringAgg.StringAggString()));
 
+  public static final SqlOperator ARR_AGG_ARR_FN =
+          createUdafOperator(
+                  "array_agg",
+                  x -> createTypeFactory().createSqlType(SqlTypeName.ARRAY),

Review comment:
       Here you simply created an array type without specifying the element type. I guess that's why the error message says the "inferred type is ARRAY NOT NULL" (just array type, your can ignore the "NOT NULL" suffix), where it should be "BIGINT NOT NULL ARRAY NOT NULL" (array of bigint).
   
   To create an array type with element type specified, you may want to use `createTypeFactory().createArrayType()`, like here: https://github.com/robinyqiu/beam/blob/cbe87445d4259b6b485bc010231dda1895022d83/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java#L170




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sonam-vend commented on a change in pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
sonam-vend commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r539602319



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -81,6 +72,12 @@
           x -> createTypeFactory().createSqlType(SqlTypeName.VARCHAR),
           new UdafImpl<>(new StringAgg.StringAggString()));
 
+  public static final SqlOperator ARR_AGG_ARR_FN =
+          createUdafOperator(
+                  "array_agg",
+                  x -> createTypeFactory().createSqlType(SqlTypeName.ARRAY),

Review comment:
       @ibzib exactly. I try to implement generic array like not sure if it is the correct way.
   
    private static RelDataType relDataType =
             new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
                     .builder()
                     .add("col_tinyint", SqlTypeName.TINYINT)
                     .add("col_smallint", SqlTypeName.SMALLINT)
                     .add("col_integer", SqlTypeName.INTEGER)
                     .add("col_bigint", SqlTypeName.BIGINT)
                     .add("col_float", SqlTypeName.FLOAT)
                     .add("col_double", SqlTypeName.DOUBLE)
                     .add("col_decimal", SqlTypeName.DECIMAL)
                     .add("col_string_varchar", SqlTypeName.VARCHAR)
                     .add("col_time", SqlTypeName.TIME)
                     .add("col_date", SqlTypeName.DATE)
                     .add("col_timestamp_with_local_time_zone", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
                     .add("col_timestamp", SqlTypeName.TIMESTAMP)
                     .add("col_boolean", SqlTypeName.BOOLEAN)
                     .build();
   
     public static final SqlOperator ARR_AGG_ARR_FN =
             createUdafOperator(
                     "array_agg",
                     x -> createTypeFactory().createArrayType(**relDataType**, -1),
                     new UdafImpl<>(new ArrayAgg.ArrayAggArray()));
   
   @robinyqiu 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r539664326



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -81,6 +72,12 @@
           x -> createTypeFactory().createSqlType(SqlTypeName.VARCHAR),
           new UdafImpl<>(new StringAgg.StringAggString()));
 
+  public static final SqlOperator ARR_AGG_ARR_FN =
+          createUdafOperator(
+                  "array_agg",
+                  x -> createTypeFactory().createSqlType(SqlTypeName.ARRAY),

Review comment:
       Yeah, ARRAY_AGG should be generic. I said the type should be "BIGINT NOT NULL ARRAY NOT NULL" because that's from the error message.
   
   Sonam, I believe there is some way that you can get the element type from the input `x`. You don't need to define your new mapping.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sonam-vend commented on pull request #13483: [BEAM-11800] Support ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
sonam-vend commented on pull request #13483:
URL: https://github.com/apache/beam/pull/13483#issuecomment-781988264


   R: @robinyqiu 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] sonam-vend commented on a change in pull request #13483: Implemeted ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
sonam-vend commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r539983556



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -81,6 +72,12 @@
           x -> createTypeFactory().createSqlType(SqlTypeName.VARCHAR),
           new UdafImpl<>(new StringAgg.StringAggString()));
 
+  public static final SqlOperator ARR_AGG_ARR_FN =
+          createUdafOperator(
+                  "array_agg",
+                  x -> createTypeFactory().createSqlType(SqlTypeName.ARRAY),

Review comment:
       x -> createTypeFactory().createArrayType(x.getOperandType(0), -1) worked.
   
   Thanks @robinyqiu  @ibzib 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #13483: [BEAM-11800] Support ARRAY_AGG fn for Zetasql dialect

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #13483:
URL: https://github.com/apache/beam/pull/13483#discussion_r578808090



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/udaf/ArrayAgg.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.udaf;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.transforms.Combine;
+
+public class ArrayAgg {
+
+  public static class ArrayAggArray extends Combine.CombineFn<Object, List<Object>, Object[]> {
+    @Override
+    public List<Object> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<Object> addInput(List<Object> accum, Object input) {
+      accum.add(input);
+      return accum;
+    }
+
+    @Override
+    public List<Object> mergeAccumulators(Iterable<List<Object>> accums) {
+      List<Object> merged = new ArrayList<>();
+      for (List<Object> accum : accums) {
+        for (Object o : accum) {
+          merged.add(o);
+        }
+      }
+      return merged;
+    }
+
+    @Override
+    public Object[] extractOutput(List<Object> accumulator) {

Review comment:
       The Beam `ARRAY` type expect the data to be a `Collection` type, not `Object[]` type (i.e Beam `ARRAY` != Java array). That's why you get the error during cast. The fix should be simple: change the return type of this function to list (also the third class generic parameter).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org