You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aitozi (via GitHub)" <gi...@apache.org> on 2023/04/25 09:43:22 UTC

[GitHub] [flink] Aitozi opened a new pull request, #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Aitozi opened a new pull request, #22485:
URL: https://github.com/apache/flink/pull/22485

   …ernal primitive array
   
   
   ## What is the purpose of the change
   
   In the below UDF, the return type of the function is CollectionDataType with `Array<INT NOT NULL>`, but it's conversion class is `Integer[]`. So the external int[] array can not be converted to internal type with exception
   
   ```
       public static class RowFunction extends ScalarFunction {
           @DataTypeHint("Row<t ARRAY<INT NOT NULL>>")
           public RowData eval() {
               int[] i = new int[3];
               return Row.of(i)
           }
       }
   ```
   
   ```
   Caused by: java.lang.ClassCastException: class [I cannot be cast to class [Ljava.lang.Object; ([I and [Ljava.lang.Object; are in module java.base of loader 'bootstrap')
   org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
   org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
   org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
   org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
   org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
   StreamExecCalc$251.processElement_split9(Unknown Source)
   StreamExecCalc$251.processElement(Unknown Source)
   org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) 
   ```
   
   
   ## Brief change log
   
   - change the primitive types default conversion class to respect to the nullability
   - change the conversion class when datatype is transform the nullability.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "wuchong (via GitHub)" <gi...@apache.org>.
wuchong commented on PR #22485:
URL: https://github.com/apache/flink/pull/22485#issuecomment-1586986673

   The failed case is tracked by [FLINK-32204](https://issues.apache.org/jira/browse/FLINK-32204). Will merge it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "Aitozi (via GitHub)" <gi...@apache.org>.
Aitozi commented on PR #22485:
URL: https://github.com/apache/flink/pull/22485#issuecomment-1582213873

   > Hi @Aitozi , could you add tests (UDF with `@DataTypeHint`) which is reported in your issue. Others looks good to me.
   
   @wuchong done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "Aitozi (via GitHub)" <gi...@apache.org>.
Aitozi commented on code in PR #22485:
URL: https://github.com/apache/flink/pull/22485#discussion_r1177360955


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java:
##########
@@ -332,7 +332,23 @@ public void testRegisteringValuesWithComplexTypes() {
         mapData.put(1, 1);
         mapData.put(2, 2);
 
-        Row row = Row.of(mapData, Row.of(1, 2, 3), new Integer[] {1, 2});
+        Row row = Row.of(mapData, Row.of(1, 2, 3), new int[] {1, 2});

Review Comment:
   The type of this is ROW < .., .., ARRAY<INT NOT NULL>>. So we have to provide `new int[] {1, 2}` to pass the `containsExactly`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong merged pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "wuchong (via GitHub)" <gi...@apache.org>.
wuchong merged PR #22485:
URL: https://github.com/apache/flink/pull/22485


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "wuchong (via GitHub)" <gi...@apache.org>.
wuchong commented on code in PR #22485:
URL: https://github.com/apache/flink/pull/22485#discussion_r1226135797


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1329,6 +1329,28 @@ public void testUsingAddJar() throws Exception {
                 "drop function lowerUdf");
     }
 
+    @Test
+    public void testArrayWithPrimitiveType() {
+        List<Row> sourceData = Arrays.asList(Row.of(1, 2), Row.of(3, 4));
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        tEnv().executeSql(
+                        "CREATE TABLE SourceTable(i INT NOT NULL, j INT NOT NULL) WITH ('connector' = 'COLLECTION')");
+        tEnv().executeSql(
+                        "CREATE FUNCTION row_of_array AS '"
+                                + RowOfArrayFunction.class.getName()
+                                + "'");
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql("SELECT row_of_array(i, j) FROM SourceTable").collect());
+        assertThat(rows)
+                .isEqualTo(
+                        Arrays.asList(
+                                Row.of(Row.of(new int[] {1, 2})),
+                                Row.of(Row.of(new int[] {3, 4}))));

Review Comment:
   This confuses whether it's a primitive array argument or a vararg. Cast it to `Object` to make it clear: `Row.of(Row.of((Object) new int[]{1, 2}))`. 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1756,6 +1778,14 @@ public Boolean eval(@DataTypeHint("BOOLEAN NOT NULL") Boolean b) {
         }
     }
 
+    /** A function with Row of array with primitive type as return type for test FLINK-31835. */
+    public static class RowOfArrayFunction extends ScalarFunction {
+        @DataTypeHint("Row<t ARRAY<INT NOT NULL>>")

Review Comment:
   Could you add a test for `ARRAY<INT>`? 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1756,6 +1778,14 @@ public Boolean eval(@DataTypeHint("BOOLEAN NOT NULL") Boolean b) {
         }
     }
 
+    /** A function with Row of array with primitive type as return type for test FLINK-31835. */
+    public static class RowOfArrayFunction extends ScalarFunction {
+        @DataTypeHint("Row<t ARRAY<INT NOT NULL>>")
+        public Row eval(int... v) {
+            return Row.of(v);

Review Comment:
   Same here. Cast to `(Object)`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22485:
URL: https://github.com/apache/flink/pull/22485#issuecomment-1521496544

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "cfa23eca0351e51b98fd315126d9002f72e5e0fc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cfa23eca0351e51b98fd315126d9002f72e5e0fc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cfa23eca0351e51b98fd315126d9002f72e5e0fc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "Aitozi (via GitHub)" <gi...@apache.org>.
Aitozi commented on PR #22485:
URL: https://github.com/apache/flink/pull/22485#issuecomment-1584354342

   The CI failure seems unrelated 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #22485: [FLINK-31835][planner] Fix the array type can't be converted from ext…

Posted by "Aitozi (via GitHub)" <gi...@apache.org>.
Aitozi commented on code in PR #22485:
URL: https://github.com/apache/flink/pull/22485#discussion_r1226152539


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1329,6 +1329,28 @@ public void testUsingAddJar() throws Exception {
                 "drop function lowerUdf");
     }
 
+    @Test
+    public void testArrayWithPrimitiveType() {
+        List<Row> sourceData = Arrays.asList(Row.of(1, 2), Row.of(3, 4));
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        tEnv().executeSql(
+                        "CREATE TABLE SourceTable(i INT NOT NULL, j INT NOT NULL) WITH ('connector' = 'COLLECTION')");
+        tEnv().executeSql(
+                        "CREATE FUNCTION row_of_array AS '"
+                                + RowOfArrayFunction.class.getName()
+                                + "'");
+        List<Row> rows =
+                CollectionUtil.iteratorToList(
+                        tEnv().executeSql("SELECT row_of_array(i, j) FROM SourceTable").collect());
+        assertThat(rows)
+                .isEqualTo(
+                        Arrays.asList(
+                                Row.of(Row.of(new int[] {1, 2})),
+                                Row.of(Row.of(new int[] {3, 4}))));

Review Comment:
   updated



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1756,6 +1778,14 @@ public Boolean eval(@DataTypeHint("BOOLEAN NOT NULL") Boolean b) {
         }
     }
 
+    /** A function with Row of array with primitive type as return type for test FLINK-31835. */
+    public static class RowOfArrayFunction extends ScalarFunction {
+        @DataTypeHint("Row<t ARRAY<INT NOT NULL>>")

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org