You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/10 11:20:12 UTC

[GitHub] [beam] je-ik opened a new pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

je-ik opened a new pull request #14986:
URL: https://github.com/apache/beam/pull/14986


   Fixes [BEAM-12473] and add a hint to possible solution by using
   ```java
   registerUdaf(
     "MAX_LENGTH",
     new UdfTypeUtils.CombineFnDelegate<>(
         Max.of(
             (Comparator<String> & Serializable)
                 (a, b) -> Integer.compare(a.length(), b.length()))) {}))
   ```
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik merged pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
je-ik merged pull request #14986:
URL: https://github.com/apache/beam/pull/14986


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14986:
URL: https://github.com/apache/beam/pull/14986#issuecomment-860728023


   @ibzib ready for another round. The added commit should be squashed before merge. I also fixed additional bug in `CalciteUtils` with handling of nested ParameterizedTypes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on a change in pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
je-ik commented on a change in pull request #14986:
URL: https://github.com/apache/beam/pull/14986#discussion_r650365075



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {
+              throw new IllegalArgumentException(
+                  "Unable to infer SQL type from type variable "
+                      + inputType
+                      + ". This usually means you are trying to use a generic type whose type information "
+                      + "is not known at runtime. You can wrap your CombineFn into typed subclass"
+                      + " by 'new UdfTypeUtils.CombineFnDelegate<>(combineFn) {}'");

Review comment:
       The code is valid in JDK11+. Maybe JDK8 users will be able to figure out how to fix that? Or we can add `<...>`,

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {

Review comment:
       Nice catch! We should handle at least GenericTypeArray, as that can be when the class would extend `CombineFn<List<String>[], ...>`. WildcardType should not be possible there, because one cannot do `extends X<?>`.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -102,17 +117,23 @@ protected Type getOutputType() {
     return combineFn.getOutputType().getType();
   }
 
+  @Nullable
+  private Type getDeclaredInputType() {

Review comment:
       Hm, I didn't want to interfere to the previous logic too much. But maybe we can unify that. The `getOutputType` calls the `combineFn.getOutputType` anyway.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
##########
@@ -63,6 +69,20 @@ public void mergeAccumulators() {
     assertEquals(2L, merged);
   }
 
+  @Test
+  public void testParameterExtractionFromCombineFn_CombineFnDelegate() {

Review comment:
       Alright, we will probably need more tests anyway, due to handling of the GenericTypeArray.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {
+      return Optional.<TypeDescriptor<OutputT>>ofNullable(getGenericSuperTypeAtIndex(2))
+          .orElse(delegate.getOutputType());
+    }
+
+    @Override
+    public TypeDescriptor<InputT> getInputType() {
+      return Optional.<TypeDescriptor<InputT>>ofNullable(getGenericSuperTypeAtIndex(0))
+          .orElse(delegate.getInputType());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private <T> TypeDescriptor<T> getGenericSuperTypeAtIndex(int index) {
+      Type superClass = getClass().getGenericSuperclass();
+      if (superClass != null) {
+        ParameterizedType superType = (ParameterizedType) superClass;

Review comment:
       agree

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>

Review comment:
       :+1:

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {

Review comment:
       We do not need it. I just didn't come up with a clear naming for the inner class. It is related solely to the "type erasure problem", so what would be the best naming? `CombineFnDelegate` as a top-level class in package `org.apache.beam.sdk.extensions.sql` feels weird to me. Maybe we could create a subpackge in `org.apache.beam.sdk.extensions.sql.udf` (maybe `util`)?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {

Review comment:
       reformatted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14986:
URL: https://github.com/apache/beam/pull/14986#issuecomment-859635737


   @ibzib can you help me review this, or can you suggest someone?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14986:
URL: https://github.com/apache/beam/pull/14986#issuecomment-861207857


   > LGTM, though I agree we should probably move this to another package.
   
   I marked the class `@Experimental` so that we can move it to different package, if we find one. I didn't want to create a package for single class ATM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #14986:
URL: https://github.com/apache/beam/pull/14986#discussion_r651272347



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
##########
@@ -313,16 +314,22 @@ public static RelDataType sqlTypeWithAutoCast(RelDataTypeFactory typeFactory, Ty
     } else if (type instanceof ParameterizedType) {
       ParameterizedType parameterizedType = (ParameterizedType) type;
       if (java.util.List.class.isAssignableFrom((Class<?>) parameterizedType.getRawType())) {
-        Class<?> genericType = (Class<?>) parameterizedType.getActualTypeArguments()[0];
-        RelDataType collectionElementType = typeFactory.createJavaType(genericType);
-        return typeFactory.createArrayType(collectionElementType, UNLIMITED_ARRAY_SIZE);
+        RelDataType elementType =
+            sqlTypeWithAutoCast(typeFactory, parameterizedType.getActualTypeArguments()[0]);

Review comment:
       Good catch here. We definitely want `sqlTypeWithAutoCast`, not `createJavaType`.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {

Review comment:
       `org.apache.beam.sdk.extensions.sql.udf` is set aside for a newer version of the UDF API (it uses [AggregateFn](https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/udf/src/main/java/org/apache/beam/sdk/extensions/sql/udf/AggregateFn.java) instead of CombineFn for aggregates). I'm not sure there's a natural place for the new class. cc @apilloud 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #14986:
URL: https://github.com/apache/beam/pull/14986#discussion_r650217041



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>

Review comment:
       Add a javadoc comment.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {
+              throw new IllegalArgumentException(
+                  "Unable to infer SQL type from type variable "
+                      + inputType
+                      + ". This usually means you are trying to use a generic type whose type information "
+                      + "is not known at runtime. You can wrap your CombineFn into typed subclass"
+                      + " by 'new UdfTypeUtils.CombineFnDelegate<>(combineFn) {}'");

Review comment:
       Nit: if the user literally types `new UdfTypeUtils.CombineFnDelegate<>(combineFn) {}`, it will fail to compile: `Cannot use '<>' with anonymous inner classes`. Can we put in placeholders for the implementation types?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
 
           @Override
           public RelDataType getType(RelDataTypeFactory typeFactory) {
-            return CalciteUtils.sqlTypeWithAutoCast(typeFactory, getInputType());
+            Type inputType = getInputType();
+            if (inputType instanceof TypeVariable) {

Review comment:
       I wonder if (here and elsewhere), instead of checking `inputType instanceof TypeVariable`, we should check `!(inputType instanceof ParameterizedType)`. Not sure if we are able to handle GenericArrayType or WildcardType. https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/reflect/Type.html

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {

Review comment:
       Why do we need this outer class?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {
+      return Optional.<TypeDescriptor<OutputT>>ofNullable(getGenericSuperTypeAtIndex(2))
+          .orElse(delegate.getOutputType());
+    }
+
+    @Override
+    public TypeDescriptor<InputT> getInputType() {
+      return Optional.<TypeDescriptor<InputT>>ofNullable(getGenericSuperTypeAtIndex(0))
+          .orElse(delegate.getInputType());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private <T> TypeDescriptor<T> getGenericSuperTypeAtIndex(int index) {
+      Type superClass = getClass().getGenericSuperclass();
+      if (superClass != null) {
+        ParameterizedType superType = (ParameterizedType) superClass;

Review comment:
       What if superClass isn't an instance of ParameterizedType? IIRC that would be possible if using multiple layers of inheritance. (Not sure why anyone would do that, but probably something we should handle regardless.)

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+  public static class CombineFnDelegate<InputT, AccumT, OutputT>
+      extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+    private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+    protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public AccumT createAccumulator() {
+      return delegate.createAccumulator();
+    }
+
+    @Override
+    public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+      return delegate.addInput(mutableAccumulator, input);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return delegate.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT extractOutput(AccumT accumulator) {
+      return delegate.extractOutput(accumulator);
+    }
+
+    @Override
+    public AccumT compact(AccumT accumulator) {
+      return delegate.compact(accumulator);
+    }
+
+    @Override
+    public OutputT apply(Iterable<? extends InputT> inputs) {
+      return delegate.apply(inputs);
+    }
+
+    @Override
+    public OutputT defaultValue() {
+      return delegate.defaultValue();
+    }
+
+    @Override
+    public TypeDescriptor<OutputT> getOutputType() {

Review comment:
       Nit: we could move getOutputType and getInputType to the top of the class to highlight they're the only methods that don't just defer to the delegate.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -102,17 +117,23 @@ protected Type getOutputType() {
     return combineFn.getOutputType().getType();
   }
 
+  @Nullable
+  private Type getDeclaredInputType() {

Review comment:
       Why do we need a separate method?

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
##########
@@ -63,6 +69,20 @@ public void mergeAccumulators() {
     assertEquals(2L, merged);
   }
 
+  @Test
+  public void testParameterExtractionFromCombineFn_CombineFnDelegate() {

Review comment:
       This test is in class `LazyAggregateCombineFnTest`, but it doesn't use `LazyAggregateCombineFn` at all. Since  `UdafImplTest.java` doesn't exist, let's create a new file for this test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #14986:
URL: https://github.com/apache/beam/pull/14986#discussion_r651280045



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {

Review comment:
       `org.apache.beam.sdk.extensions.sql.udf` is set aside for a newer version of the UDF API (it uses [AggregateFn](https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/udf/src/main/java/org/apache/beam/sdk/extensions/sql/udf/AggregateFn.java) instead of CombineFn for aggregates).
   
   Since there is nothing SQL-specific about this solution, we might consider moving the class into the core Java SDK (`org.apache.beam.sdk.util`). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] je-ik commented on pull request #14986: [BEAM-12473] fix possible ClassCastException with UDAF

Posted by GitBox <gi...@apache.org>.
je-ik commented on pull request #14986:
URL: https://github.com/apache/beam/pull/14986#issuecomment-858538776


   I'm not 100% sure this is the best solution. We should definitely not throw ClassCastException in the described case. My first iteration was to simply throw an exception explaining why this doesn't work, but then I realized, that by wrapping into anonymous subclass the problem can be solved. Not sure if the anonymous subclass should be created as well, but looks reusable and actually helps a lot (at least with JDK11+, where it is possible to create anonymous subclasses with diamond operator, so the user does not actually know that the accumulator of Max is `Combine.Holder<T>`, which is nice).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org