You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/06 13:00:17 UTC

[GitHub] [flink] snuyanzin opened a new pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

snuyanzin opened a new pull request #18287:
URL: https://github.com/apache/flink/pull/18287


   ## What is the purpose of the change
   This PR adds support of casting maps to maps and multisets to multisets
   
   
   ## Brief change log
     -  Casting rule `org.apache.flink.table.planner.functions.casting.MapToMapAndMultisetToMultisetCastRule`
     - Tests in `org.apache.flink.table.planner.functions.casting.CastRulesTest`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
     - Added tests in `CastRulesTest` that validate casting of maps to maps, multisets to multisets
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066",
       "triggerID" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99096432609f559ded33520e48c75fce7aeb22da",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29100",
       "triggerID" : "99096432609f559ded33520e48c75fce7aeb22da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 99096432609f559ded33520e48c75fce7aeb22da Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29100) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r780981547



##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1142,14 +1146,27 @@ protected Configuration configuration() {
 
     public static List<TestSpec> constructedTypes() {
         return Arrays.asList(
-                // https://issues.apache.org/jira/browse/FLINK-17321
-                // MULTISET
-                // MAP
+                CastTestSpecBuilder.testCastTo(MAP(STRING(), STRING()))
+                        .fromCase(MAP(FLOAT(), DOUBLE()), null, null)
+                        .fromCase(
+                                MAP(INT(), INT()),
+                                Collections.singletonMap(1, 2),
+                                Collections.singletonMap("1", "2"))

Review comment:
       It should not fail, the goal of this issue is to allow such casting. The thing is that I'm actually surprised it doesn't fail without touching `LogicalTypeCasts`, which is the class that checks valid/invalid casts.
   
   EDIT: I've done some debugging, `LogicalTypeCasts#supportsConstructedCasting` does the job and just checks for the equality of the type root and for the castability of the children. So it works :smile: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c6b7bc3002138813504da329f2b0024f4f05afa5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r780326206



##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1142,14 +1146,27 @@ protected Configuration configuration() {
 
     public static List<TestSpec> constructedTypes() {
         return Arrays.asList(
-                // https://issues.apache.org/jira/browse/FLINK-17321
-                // MULTISET
-                // MAP
+                CastTestSpecBuilder.testCastTo(MAP(STRING(), STRING()))
+                        .fromCase(MAP(FLOAT(), DOUBLE()), null, null)
+                        .fromCase(
+                                MAP(INT(), INT()),
+                                Collections.singletonMap(1, 2),
+                                Collections.singletonMap("1", "2"))

Review comment:
       :thinking: I'm surprised that this works without changing anything in `LogicalTypeCasts`... Can you add a test case for failure? For example:
   
   ```
   CastTestSpecBuilder
     .testCastTo(MAP(STRING(), STRING()))
     .fail(MAP(STRING(), ROW(INT())), whateverValue)
   ```
   
   Same for ROW to ROW and ARRAY to ARRAY. Just take whatever inner invalid tuple and see what exception you get. If it fails within the single `CastRule` implementations, then something is wrong with `LogicalTypeCasts`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007282991


   @snuyanzin I rather prefer that the multiset e2e casting gets worked out in a separate issue, as it requires significant more testing to check that it works throughout the stack, and as you have shown in the commits, it also requires new function definitions to expose it to the user. That's something we should rather address in a separate context.
   
   As a goal for this task i would say we want:
   
   * `CastRulesTest` tests for array to array, row to row, map to map and multiset to multiset
   * `CastFunctionITCase` tests for array to array, row to row and map to map.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007025074


   Thanks for pointing the issue, I had a look however it seems it does not help with current multiset issue.
   After some debugging I noticed there is a lack of support of multisets during resolving of multiset expressions.
   I added some things to make casting working.
   Not sure if it makes sense to have under this issue or under a separate one
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007440699


   ok, I removed multiset related fix from this PR and created a separate issue for that https://issues.apache.org/jira/browse/FLINK-25567
   
   Now it should match the goal, please let me know if not


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066",
       "triggerID" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d1626610d3ad7597336f09ac661e62e86da336b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007282991






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006575627


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit a65fb5e01c2b97f13a16146a27084af3282a0c71 (Thu Jan 06 13:06:01 UTC 2022)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007025074


   @slinkydeveloper  thanks for pointing the issue, I had a look however it seems it does not help with current multiset issue.
   After some debugging I noticed there is a lack of support of multisets during resolving of multiset expressions.
   I added some things to make casting working.
   Not sure if it makes sense to have under this issue or under a separate one
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050) 
   * c6b7bc3002138813504da329f2b0024f4f05afa5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #18287:
URL: https://github.com/apache/flink/pull/18287


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066",
       "triggerID" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99096432609f559ded33520e48c75fce7aeb22da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29100",
       "triggerID" : "99096432609f559ded33520e48c75fce7aeb22da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d1626610d3ad7597336f09ac661e62e86da336b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066) 
   * 99096432609f559ded33520e48c75fce7aeb22da Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29100) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r779537294



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+public class MapToMapAndMultisetToMultisetCastRule

Review comment:
       No public, package-private is fine




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r786612124



##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
##########
@@ -722,9 +722,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case ARRAY_VALUE_CONSTRUCTOR =>
         generateArray(ctx, resultType, operands)
 
-      // maps and multisets
-      case MAP_VALUE_CONSTRUCTOR | MULTISET_VALUE =>
-        generateMapOrMultiset(ctx, resultType, operands)
+      // maps
+      case MAP_VALUE_CONSTRUCTOR =>

Review comment:
       @snuyanzin Looking forward to a PR for this commit. Maybe you can also address the comment about the value constructor in this PR (using HashMap for deduplication of key).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r787556416



##########
File path: flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
##########
@@ -722,9 +722,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
       case ARRAY_VALUE_CONSTRUCTOR =>
         generateArray(ctx, resultType, operands)
 
-      // maps and multisets
-      case MAP_VALUE_CONSTRUCTOR | MULTISET_VALUE =>
-        generateMapOrMultiset(ctx, resultType, operands)
+      // maps
+      case MAP_VALUE_CONSTRUCTOR =>

Review comment:
       hi @twalthr , thanks for highlighting this
   yes I will try to handle this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050) 
   * c6b7bc3002138813504da329f2b0024f4f05afa5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c6b7bc3002138813504da329f2b0024f4f05afa5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056) 
   * 2d1626610d3ad7597336f09ac661e62e86da336b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r786601681



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+class MapToMapAndMultisetToMultisetCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<MapData, MapData> {
+
+    static final MapToMapAndMultisetToMultisetCastRule INSTANCE =
+            new MapToMapAndMultisetToMultisetCastRule();
+
+    private MapToMapAndMultisetToMultisetCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                MapToMapAndMultisetToMultisetCastRule
+                                        ::isValidMapToMapOrMultisetToMultisetCasting)
+                        .build());
+    }
+
+    private static boolean isValidMapToMapOrMultisetToMultisetCasting(
+            LogicalType input, LogicalType target) {
+        return input.is(LogicalTypeRoot.MAP)
+                        && target.is(LogicalTypeRoot.MAP)
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getKeyType(),
+                                        ((MapType) target).getKeyType())
+                                != null
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getValueType(),
+                                        ((MapType) target).getValueType())
+                                != null
+                || input.is(LogicalTypeRoot.MULTISET)
+                        && target.is(LogicalTypeRoot.MULTISET)
+                        && CastRuleProvider.resolve(
+                                        ((MultisetType) input).getElementType(),
+                                        ((MultisetType) target).getElementType())
+                                != null;
+    }
+
+    /* Example generated code for MULTISET<INT> -> MULTISET<FLOAT>:
+    org.apache.flink.table.data.MapData _myInput = ((org.apache.flink.table.data.MapData)(_myInputObj));
+    boolean _myInputIsNull = _myInputObj == null;
+    boolean isNull$0;
+    org.apache.flink.table.data.MapData result$1;
+    float result$2;
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.util.Map map$838 = new java.util.HashMap();
+        for (int i$841 = 0; i$841 < _myInput.size(); i$841++) {
+            java.lang.Float key$839 = null;
+            java.lang.Integer value$840 = null;
+            if (!_myInput.keyArray().isNullAt(i$841)) {
+                result$2 = ((float)(_myInput.keyArray().getInt(i$841)));
+                key$839 = result$2;
+            }
+            value$840 = _myInput.valueArray().getInt(i$841);
+            map$838.put(key$839, value$840);
+        }
+        result$1 = new org.apache.flink.table.data.GenericMapData(map$838);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = null;
+    }
+    return result$1;
+
+     */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputKeyType;
+        final LogicalType innerInputValueType;
+
+        final LogicalType innerTargetKeyType;
+        final LogicalType innerTargetValueType;
+        if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+            innerInputKeyType = ((MultisetType) inputLogicalType).getElementType();
+            innerInputValueType = new IntType(false);
+            innerTargetKeyType = ((MultisetType) targetLogicalType).getElementType();
+            innerTargetValueType = new IntType(false);
+        } else {
+            innerInputKeyType = ((MapType) inputLogicalType).getKeyType();
+            innerInputValueType = ((MapType) inputLogicalType).getValueType();
+            innerTargetKeyType = ((MapType) targetLogicalType).getKeyType();
+            innerTargetValueType = ((MapType) targetLogicalType).getValueType();
+        }
+
+        final String innerTargetKeyTypeTerm = boxedTypeTermForType(innerTargetKeyType);
+        final String innerTargetValueTypeTerm = boxedTypeTermForType(innerTargetValueType);
+        final String keyArrayTerm = methodCall(inputTerm, "keyArray");
+        final String valueArrayTerm = methodCall(inputTerm, "valueArray");
+        final String size = methodCall(inputTerm, "size");
+        final String map = newName("map");
+        final String key = newName("key");
+        final String value = newName("value");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(className(Map.class), map, constructorCall(HashMap.class))
+                .forStmt(
+                        size,
+                        (index, codeWriter) -> {
+                            CastCodeBlock keyCodeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(
+                                                    index, keyArrayTerm, innerInputKeyType),
+                                            "false",
+                                            // Null check is done at the array access level
+                                            innerInputKeyType.copy(false),
+                                            innerTargetKeyType);
+
+                            CastCodeBlock valueCodeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(
+                                                    index, valueArrayTerm, innerInputValueType),
+                                            "false",
+                                            // Null check is done at the array access level
+                                            innerInputValueType.copy(false),

Review comment:
       yep, perhaps can you fix it when merging?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050) 
   * c6b7bc3002138813504da329f2b0024f4f05afa5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r780465918



##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1142,14 +1146,27 @@ protected Configuration configuration() {
 
     public static List<TestSpec> constructedTypes() {
         return Arrays.asList(
-                // https://issues.apache.org/jira/browse/FLINK-17321
-                // MULTISET
-                // MAP
+                CastTestSpecBuilder.testCastTo(MAP(STRING(), STRING()))
+                        .fromCase(MAP(FLOAT(), DOUBLE()), null, null)
+                        .fromCase(
+                                MAP(INT(), INT()),
+                                Collections.singletonMap(1, 2),
+                                Collections.singletonMap("1", "2"))

Review comment:
       I didn't get, should it fail?
   ```sql
   SELECT CAST(MAP['a', row(1)] AS MAP<STRING, STRING>);
   ```
   I thought it should return something like that
   ```
   {a=(1)}




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007282991


   @snuyanzin I rather prefer that the multiset e2e casting gets worked out in a separate issue, as it requires significant more testing to check that it works throughout the stack.
   
   As a goal for this task, having the test for map and multiset in `CastRulesTest` green and the test only for map in `CastFunctionITCase` green is enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r780465918



##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1142,14 +1146,27 @@ protected Configuration configuration() {
 
     public static List<TestSpec> constructedTypes() {
         return Arrays.asList(
-                // https://issues.apache.org/jira/browse/FLINK-17321
-                // MULTISET
-                // MAP
+                CastTestSpecBuilder.testCastTo(MAP(STRING(), STRING()))
+                        .fromCase(MAP(FLOAT(), DOUBLE()), null, null)
+                        .fromCase(
+                                MAP(INT(), INT()),
+                                Collections.singletonMap(1, 2),
+                                Collections.singletonMap("1", "2"))

Review comment:
       I didn't get, should it fail?
   ```sql
   SELECT CAST(MAP['a', row(1)] AS MAP<STRING, STRING>);
   ```
   I thought it should return something like that
   ```
   {a=(1)}




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066",
       "triggerID" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99096432609f559ded33520e48c75fce7aeb22da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "99096432609f559ded33520e48c75fce7aeb22da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d1626610d3ad7597336f09ac661e62e86da336b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066) 
   * 99096432609f559ded33520e48c75fce7aeb22da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r786603466



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+class MapToMapAndMultisetToMultisetCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<MapData, MapData> {
+
+    static final MapToMapAndMultisetToMultisetCastRule INSTANCE =
+            new MapToMapAndMultisetToMultisetCastRule();
+
+    private MapToMapAndMultisetToMultisetCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                MapToMapAndMultisetToMultisetCastRule
+                                        ::isValidMapToMapOrMultisetToMultisetCasting)
+                        .build());
+    }
+
+    private static boolean isValidMapToMapOrMultisetToMultisetCasting(
+            LogicalType input, LogicalType target) {
+        return input.is(LogicalTypeRoot.MAP)
+                        && target.is(LogicalTypeRoot.MAP)
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getKeyType(),
+                                        ((MapType) target).getKeyType())
+                                != null
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getValueType(),
+                                        ((MapType) target).getValueType())
+                                != null
+                || input.is(LogicalTypeRoot.MULTISET)
+                        && target.is(LogicalTypeRoot.MULTISET)
+                        && CastRuleProvider.resolve(
+                                        ((MultisetType) input).getElementType(),
+                                        ((MultisetType) target).getElementType())
+                                != null;
+    }
+
+    /* Example generated code for MULTISET<INT> -> MULTISET<FLOAT>:
+    org.apache.flink.table.data.MapData _myInput = ((org.apache.flink.table.data.MapData)(_myInputObj));
+    boolean _myInputIsNull = _myInputObj == null;
+    boolean isNull$0;
+    org.apache.flink.table.data.MapData result$1;
+    float result$2;
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.util.Map map$838 = new java.util.HashMap();
+        for (int i$841 = 0; i$841 < _myInput.size(); i$841++) {
+            java.lang.Float key$839 = null;
+            java.lang.Integer value$840 = null;
+            if (!_myInput.keyArray().isNullAt(i$841)) {
+                result$2 = ((float)(_myInput.keyArray().getInt(i$841)));
+                key$839 = result$2;
+            }
+            value$840 = _myInput.valueArray().getInt(i$841);
+            map$838.put(key$839, value$840);
+        }
+        result$1 = new org.apache.flink.table.data.GenericMapData(map$838);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = null;
+    }
+    return result$1;
+
+     */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputKeyType;
+        final LogicalType innerInputValueType;
+
+        final LogicalType innerTargetKeyType;
+        final LogicalType innerTargetValueType;
+        if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+            innerInputKeyType = ((MultisetType) inputLogicalType).getElementType();
+            innerInputValueType = new IntType(false);
+            innerTargetKeyType = ((MultisetType) targetLogicalType).getElementType();
+            innerTargetValueType = new IntType(false);
+        } else {
+            innerInputKeyType = ((MapType) inputLogicalType).getKeyType();
+            innerInputValueType = ((MapType) inputLogicalType).getValueType();
+            innerTargetKeyType = ((MapType) targetLogicalType).getKeyType();
+            innerTargetValueType = ((MapType) targetLogicalType).getValueType();
+        }
+
+        final String innerTargetKeyTypeTerm = boxedTypeTermForType(innerTargetKeyType);
+        final String innerTargetValueTypeTerm = boxedTypeTermForType(innerTargetValueType);
+        final String keyArrayTerm = methodCall(inputTerm, "keyArray");
+        final String valueArrayTerm = methodCall(inputTerm, "valueArray");
+        final String size = methodCall(inputTerm, "size");
+        final String map = newName("map");
+        final String key = newName("key");
+        final String value = newName("value");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(className(Map.class), map, constructorCall(HashMap.class))

Review comment:
       A cast to `MAP<CHAR(1), X>` could produce multiple values. But after a offline discussion, I think it is safe to use a HashMap here and the problem is rather in the `ScalarOperatorGens#generateMap`.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+class MapToMapAndMultisetToMultisetCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<MapData, MapData> {
+
+    static final MapToMapAndMultisetToMultisetCastRule INSTANCE =
+            new MapToMapAndMultisetToMultisetCastRule();
+
+    private MapToMapAndMultisetToMultisetCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                MapToMapAndMultisetToMultisetCastRule
+                                        ::isValidMapToMapOrMultisetToMultisetCasting)
+                        .build());
+    }
+
+    private static boolean isValidMapToMapOrMultisetToMultisetCasting(
+            LogicalType input, LogicalType target) {
+        return input.is(LogicalTypeRoot.MAP)
+                        && target.is(LogicalTypeRoot.MAP)
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getKeyType(),
+                                        ((MapType) target).getKeyType())
+                                != null
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getValueType(),
+                                        ((MapType) target).getValueType())
+                                != null
+                || input.is(LogicalTypeRoot.MULTISET)
+                        && target.is(LogicalTypeRoot.MULTISET)
+                        && CastRuleProvider.resolve(
+                                        ((MultisetType) input).getElementType(),
+                                        ((MultisetType) target).getElementType())
+                                != null;
+    }
+
+    /* Example generated code for MULTISET<INT> -> MULTISET<FLOAT>:
+    org.apache.flink.table.data.MapData _myInput = ((org.apache.flink.table.data.MapData)(_myInputObj));
+    boolean _myInputIsNull = _myInputObj == null;
+    boolean isNull$0;
+    org.apache.flink.table.data.MapData result$1;
+    float result$2;
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.util.Map map$838 = new java.util.HashMap();
+        for (int i$841 = 0; i$841 < _myInput.size(); i$841++) {
+            java.lang.Float key$839 = null;
+            java.lang.Integer value$840 = null;
+            if (!_myInput.keyArray().isNullAt(i$841)) {
+                result$2 = ((float)(_myInput.keyArray().getInt(i$841)));
+                key$839 = result$2;
+            }
+            value$840 = _myInput.valueArray().getInt(i$841);
+            map$838.put(key$839, value$840);
+        }
+        result$1 = new org.apache.flink.table.data.GenericMapData(map$838);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = null;
+    }
+    return result$1;
+
+     */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputKeyType;
+        final LogicalType innerInputValueType;
+
+        final LogicalType innerTargetKeyType;
+        final LogicalType innerTargetValueType;
+        if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+            innerInputKeyType = ((MultisetType) inputLogicalType).getElementType();
+            innerInputValueType = new IntType(false);
+            innerTargetKeyType = ((MultisetType) targetLogicalType).getElementType();
+            innerTargetValueType = new IntType(false);
+        } else {
+            innerInputKeyType = ((MapType) inputLogicalType).getKeyType();
+            innerInputValueType = ((MapType) inputLogicalType).getValueType();
+            innerTargetKeyType = ((MapType) targetLogicalType).getKeyType();
+            innerTargetValueType = ((MapType) targetLogicalType).getValueType();
+        }
+
+        final String innerTargetKeyTypeTerm = boxedTypeTermForType(innerTargetKeyType);
+        final String innerTargetValueTypeTerm = boxedTypeTermForType(innerTargetValueType);
+        final String keyArrayTerm = methodCall(inputTerm, "keyArray");
+        final String valueArrayTerm = methodCall(inputTerm, "valueArray");
+        final String size = methodCall(inputTerm, "size");
+        final String map = newName("map");
+        final String key = newName("key");
+        final String value = newName("value");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(className(Map.class), map, constructorCall(HashMap.class))

Review comment:
       A cast to `MAP<CHAR(1), X>` could produce multiple keys. But after a offline discussion, I think it is safe to use a HashMap here and the problem is rather in the `ScalarOperatorGens#generateMap`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r786602145



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+class MapToMapAndMultisetToMultisetCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<MapData, MapData> {
+
+    static final MapToMapAndMultisetToMultisetCastRule INSTANCE =
+            new MapToMapAndMultisetToMultisetCastRule();
+
+    private MapToMapAndMultisetToMultisetCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                MapToMapAndMultisetToMultisetCastRule
+                                        ::isValidMapToMapOrMultisetToMultisetCasting)
+                        .build());
+    }
+
+    private static boolean isValidMapToMapOrMultisetToMultisetCasting(
+            LogicalType input, LogicalType target) {
+        return input.is(LogicalTypeRoot.MAP)
+                        && target.is(LogicalTypeRoot.MAP)
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getKeyType(),
+                                        ((MapType) target).getKeyType())
+                                != null
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getValueType(),
+                                        ((MapType) target).getValueType())
+                                != null
+                || input.is(LogicalTypeRoot.MULTISET)
+                        && target.is(LogicalTypeRoot.MULTISET)
+                        && CastRuleProvider.resolve(
+                                        ((MultisetType) input).getElementType(),
+                                        ((MultisetType) target).getElementType())
+                                != null;
+    }
+
+    /* Example generated code for MULTISET<INT> -> MULTISET<FLOAT>:
+    org.apache.flink.table.data.MapData _myInput = ((org.apache.flink.table.data.MapData)(_myInputObj));
+    boolean _myInputIsNull = _myInputObj == null;
+    boolean isNull$0;
+    org.apache.flink.table.data.MapData result$1;
+    float result$2;
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.util.Map map$838 = new java.util.HashMap();
+        for (int i$841 = 0; i$841 < _myInput.size(); i$841++) {
+            java.lang.Float key$839 = null;
+            java.lang.Integer value$840 = null;
+            if (!_myInput.keyArray().isNullAt(i$841)) {
+                result$2 = ((float)(_myInput.keyArray().getInt(i$841)));
+                key$839 = result$2;
+            }
+            value$840 = _myInput.valueArray().getInt(i$841);
+            map$838.put(key$839, value$840);
+        }
+        result$1 = new org.apache.flink.table.data.GenericMapData(map$838);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = null;
+    }
+    return result$1;
+
+     */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputKeyType;
+        final LogicalType innerInputValueType;
+
+        final LogicalType innerTargetKeyType;
+        final LogicalType innerTargetValueType;
+        if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+            innerInputKeyType = ((MultisetType) inputLogicalType).getElementType();
+            innerInputValueType = new IntType(false);
+            innerTargetKeyType = ((MultisetType) targetLogicalType).getElementType();
+            innerTargetValueType = new IntType(false);
+        } else {
+            innerInputKeyType = ((MapType) inputLogicalType).getKeyType();
+            innerInputValueType = ((MapType) inputLogicalType).getValueType();
+            innerTargetKeyType = ((MapType) targetLogicalType).getKeyType();
+            innerTargetValueType = ((MapType) targetLogicalType).getValueType();
+        }
+
+        final String innerTargetKeyTypeTerm = boxedTypeTermForType(innerTargetKeyType);
+        final String innerTargetValueTypeTerm = boxedTypeTermForType(innerTargetValueType);
+        final String keyArrayTerm = methodCall(inputTerm, "keyArray");
+        final String valueArrayTerm = methodCall(inputTerm, "valueArray");
+        final String size = methodCall(inputTerm, "size");
+        final String map = newName("map");
+        final String key = newName("key");
+        final String value = newName("value");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(className(Map.class), map, constructorCall(HashMap.class))

Review comment:
       I think this is safe, as the input map is assumed to be valid and should not have multiple values for the same key anyway.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+class MapToMapAndMultisetToMultisetCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<MapData, MapData> {
+
+    static final MapToMapAndMultisetToMultisetCastRule INSTANCE =
+            new MapToMapAndMultisetToMultisetCastRule();
+
+    private MapToMapAndMultisetToMultisetCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                MapToMapAndMultisetToMultisetCastRule
+                                        ::isValidMapToMapOrMultisetToMultisetCasting)
+                        .build());
+    }
+
+    private static boolean isValidMapToMapOrMultisetToMultisetCasting(
+            LogicalType input, LogicalType target) {
+        return input.is(LogicalTypeRoot.MAP)
+                        && target.is(LogicalTypeRoot.MAP)
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getKeyType(),
+                                        ((MapType) target).getKeyType())
+                                != null
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getValueType(),
+                                        ((MapType) target).getValueType())
+                                != null
+                || input.is(LogicalTypeRoot.MULTISET)
+                        && target.is(LogicalTypeRoot.MULTISET)
+                        && CastRuleProvider.resolve(
+                                        ((MultisetType) input).getElementType(),
+                                        ((MultisetType) target).getElementType())
+                                != null;
+    }
+
+    /* Example generated code for MULTISET<INT> -> MULTISET<FLOAT>:
+    org.apache.flink.table.data.MapData _myInput = ((org.apache.flink.table.data.MapData)(_myInputObj));
+    boolean _myInputIsNull = _myInputObj == null;
+    boolean isNull$0;
+    org.apache.flink.table.data.MapData result$1;
+    float result$2;
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.util.Map map$838 = new java.util.HashMap();
+        for (int i$841 = 0; i$841 < _myInput.size(); i$841++) {
+            java.lang.Float key$839 = null;
+            java.lang.Integer value$840 = null;
+            if (!_myInput.keyArray().isNullAt(i$841)) {
+                result$2 = ((float)(_myInput.keyArray().getInt(i$841)));
+                key$839 = result$2;
+            }
+            value$840 = _myInput.valueArray().getInt(i$841);
+            map$838.put(key$839, value$840);
+        }
+        result$1 = new org.apache.flink.table.data.GenericMapData(map$838);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = null;
+    }
+    return result$1;
+
+     */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputKeyType;
+        final LogicalType innerInputValueType;
+
+        final LogicalType innerTargetKeyType;
+        final LogicalType innerTargetValueType;
+        if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+            innerInputKeyType = ((MultisetType) inputLogicalType).getElementType();
+            innerInputValueType = new IntType(false);
+            innerTargetKeyType = ((MultisetType) targetLogicalType).getElementType();
+            innerTargetValueType = new IntType(false);
+        } else {
+            innerInputKeyType = ((MapType) inputLogicalType).getKeyType();
+            innerInputValueType = ((MapType) inputLogicalType).getValueType();
+            innerTargetKeyType = ((MapType) targetLogicalType).getKeyType();
+            innerTargetValueType = ((MapType) targetLogicalType).getValueType();
+        }
+
+        final String innerTargetKeyTypeTerm = boxedTypeTermForType(innerTargetKeyType);
+        final String innerTargetValueTypeTerm = boxedTypeTermForType(innerTargetValueType);
+        final String keyArrayTerm = methodCall(inputTerm, "keyArray");
+        final String valueArrayTerm = methodCall(inputTerm, "valueArray");
+        final String size = methodCall(inputTerm, "size");
+        final String map = newName("map");
+        final String key = newName("key");
+        final String value = newName("value");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(className(Map.class), map, constructorCall(HashMap.class))

Review comment:
       I think this is safe at this level, as the input map is assumed to be valid and should not have multiple values for the same key anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007282991


   @snuyanzin I rather prefer that the multiset e2e casting gets worked out in a separate issue, as it requires significant more testing to check that it works throughout the stack.
   
   As a goal for this task, having the test for map and multiset in `CastRulesTest` green and the test only for map in `CastFunctionITCase` green is enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r780326206



##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java
##########
@@ -1142,14 +1146,27 @@ protected Configuration configuration() {
 
     public static List<TestSpec> constructedTypes() {
         return Arrays.asList(
-                // https://issues.apache.org/jira/browse/FLINK-17321
-                // MULTISET
-                // MAP
+                CastTestSpecBuilder.testCastTo(MAP(STRING(), STRING()))
+                        .fromCase(MAP(FLOAT(), DOUBLE()), null, null)
+                        .fromCase(
+                                MAP(INT(), INT()),
+                                Collections.singletonMap(1, 2),
+                                Collections.singletonMap("1", "2"))

Review comment:
       :thinking: I'm surprised that this works without changing anything in `LogicalTypeCasts`... Can you add a test case for failure? For example:
   
   ```
   CastTestSpecBuilder
     .testCastTo(MAP(STRING(), STRING()))
     .fail(MAP(STRING(), ROW(INT())), whateverValue)
   ```
   
   Same for ROW to ROW and ARRAY to ARRAY. Just take whatever inner invalid tuple and see what exception you get. If it fails within the single `CastRule` implementations, then something is wrong with `LogicalTypeCasts`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066",
       "triggerID" : "2d1626610d3ad7597336f09ac661e62e86da336b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c6b7bc3002138813504da329f2b0024f4f05afa5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29056) 
   * 2d1626610d3ad7597336f09ac661e62e86da336b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29066) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007440699


   ok, I removed multiset related fix from this PR and created a separate issue for that https://issues.apache.org/jira/browse/FLINK-25567
   
   Now it should match the goal, please let me know if not


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
snuyanzin commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006658745


   I added test for map and multiset to `CastFunctionITCase` and for map it is ok while for multiset it fails with
   ```
   rg.apache.flink.table.api.ValidationException: Could not cast the value of the 1 column: [ map(1, 1) ] of a row: [ null, map(1, 1) ] to the requested type: MULTISET<INT>
   
   	at org.apache.flink.table.operations.utils.ValuesOperationFactory.lambda$null$4(ValuesOperationFactory.java:130)
   	at java.util.Optional.orElseThrow(Optional.java:290)
   	at org.apache.flink.table.operations.utils.ValuesOperationFactory.lambda$convertTopLevelExpressionToExpectedRowType$5(ValuesOperationFactory.java:127)
   	at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
   	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
   ```
   It seems the reason is not `LogicalTypeCasts` but absent converter for multiset here https://github.com/apache/flink/blob/c5581b8b41e04c1b2fabe3cee57efb1253b4066d/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/ValuesOperationFactory.java#L190
   
   I will have a look at it later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c6b7bc3002138813504da329f2b0024f4f05afa5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050) 
   * c6b7bc3002138813504da329f2b0024f4f05afa5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006663411


   Hey @snuyanzin have you checked this issue? There might be something helpful for you in the patch i posted there https://issues.apache.org/jira/browse/FLINK-25428.
   
   If the multiset test is a blocker for you, and there's no easy fix for that, you can ignore it now, as it's still not well supported by the whole stack, so perhaps you can just leave it the test there commented. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
slinkydeveloper edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1007282991


   @snuyanzin I rather prefer that the multiset e2e casting gets worked out in a separate issue, as it requires significant more testing to check that it works throughout the stack, and as you have shown in the commits, it also requires new function definitions to expose it to the user. That's something we should rather address in a separate context.
   
   As a goal for this task i would say we want:
   
   * `CastRulesTest` tests for array to array and row to row, map to map and multiset to multiset
   * `CastFunctionITCase` tests for array to array and row to row and map to map.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18287:
URL: https://github.com/apache/flink/pull/18287#issuecomment-1006572849


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046",
       "triggerID" : "a65fb5e01c2b97f13a16146a27084af3282a0c71",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050",
       "triggerID" : "6f663ca1bc470616102d1b9232bb164bba41bc7b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a65fb5e01c2b97f13a16146a27084af3282a0c71 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29046) 
   * 6f663ca1bc470616102d1b9232bb164bba41bc7b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29050) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18287: [FLINK-17321][table] Add support casting of map to map and multiset to multiset

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18287:
URL: https://github.com/apache/flink/pull/18287#discussion_r786586583



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+class MapToMapAndMultisetToMultisetCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<MapData, MapData> {
+
+    static final MapToMapAndMultisetToMultisetCastRule INSTANCE =
+            new MapToMapAndMultisetToMultisetCastRule();
+
+    private MapToMapAndMultisetToMultisetCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                MapToMapAndMultisetToMultisetCastRule
+                                        ::isValidMapToMapOrMultisetToMultisetCasting)
+                        .build());
+    }
+
+    private static boolean isValidMapToMapOrMultisetToMultisetCasting(
+            LogicalType input, LogicalType target) {
+        return input.is(LogicalTypeRoot.MAP)
+                        && target.is(LogicalTypeRoot.MAP)
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getKeyType(),
+                                        ((MapType) target).getKeyType())
+                                != null
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getValueType(),
+                                        ((MapType) target).getValueType())
+                                != null
+                || input.is(LogicalTypeRoot.MULTISET)
+                        && target.is(LogicalTypeRoot.MULTISET)
+                        && CastRuleProvider.resolve(
+                                        ((MultisetType) input).getElementType(),
+                                        ((MultisetType) target).getElementType())
+                                != null;
+    }
+
+    /* Example generated code for MULTISET<INT> -> MULTISET<FLOAT>:
+    org.apache.flink.table.data.MapData _myInput = ((org.apache.flink.table.data.MapData)(_myInputObj));
+    boolean _myInputIsNull = _myInputObj == null;
+    boolean isNull$0;
+    org.apache.flink.table.data.MapData result$1;
+    float result$2;
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.util.Map map$838 = new java.util.HashMap();
+        for (int i$841 = 0; i$841 < _myInput.size(); i$841++) {
+            java.lang.Float key$839 = null;
+            java.lang.Integer value$840 = null;
+            if (!_myInput.keyArray().isNullAt(i$841)) {
+                result$2 = ((float)(_myInput.keyArray().getInt(i$841)));
+                key$839 = result$2;
+            }
+            value$840 = _myInput.valueArray().getInt(i$841);
+            map$838.put(key$839, value$840);
+        }
+        result$1 = new org.apache.flink.table.data.GenericMapData(map$838);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = null;
+    }
+    return result$1;
+
+     */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputKeyType;
+        final LogicalType innerInputValueType;
+
+        final LogicalType innerTargetKeyType;
+        final LogicalType innerTargetValueType;
+        if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+            innerInputKeyType = ((MultisetType) inputLogicalType).getElementType();
+            innerInputValueType = new IntType(false);
+            innerTargetKeyType = ((MultisetType) targetLogicalType).getElementType();
+            innerTargetValueType = new IntType(false);
+        } else {
+            innerInputKeyType = ((MapType) inputLogicalType).getKeyType();
+            innerInputValueType = ((MapType) inputLogicalType).getValueType();
+            innerTargetKeyType = ((MapType) targetLogicalType).getKeyType();
+            innerTargetValueType = ((MapType) targetLogicalType).getValueType();
+        }
+
+        final String innerTargetKeyTypeTerm = boxedTypeTermForType(innerTargetKeyType);
+        final String innerTargetValueTypeTerm = boxedTypeTermForType(innerTargetValueType);
+        final String keyArrayTerm = methodCall(inputTerm, "keyArray");
+        final String valueArrayTerm = methodCall(inputTerm, "valueArray");
+        final String size = methodCall(inputTerm, "size");
+        final String map = newName("map");
+        final String key = newName("key");
+        final String value = newName("value");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(className(Map.class), map, constructorCall(HashMap.class))

Review comment:
       I'm not sure if a `HashMap` is safe to use because not all internal data structures have proper `hashCode/equals`. By looking at `org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens#generateMap`, I also don't see any `HashMap`. A safer solution sounds like reusing the array to array casting logic and apply it keyArray and valueArray? In general, we don't guarantee unique keys for maps.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/MapToMapAndMultisetToMultisetCastRule.java
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.casting;
+
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.boxedTypeTermForType;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.className;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
+import static org.apache.flink.table.planner.codegen.CodeGenUtils.rowFieldReadAccess;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
+import static org.apache.flink.table.planner.functions.casting.CastRuleUtils.methodCall;
+
+/**
+ * {@link LogicalTypeRoot#MAP} to {@link LogicalTypeRoot#MAP} and {@link LogicalTypeRoot#MULTISET}
+ * to {@link LogicalTypeRoot#MULTISET} cast rule.
+ */
+class MapToMapAndMultisetToMultisetCastRule
+        extends AbstractNullAwareCodeGeneratorCastRule<MapData, MapData> {
+
+    static final MapToMapAndMultisetToMultisetCastRule INSTANCE =
+            new MapToMapAndMultisetToMultisetCastRule();
+
+    private MapToMapAndMultisetToMultisetCastRule() {
+        super(
+                CastRulePredicate.builder()
+                        .predicate(
+                                MapToMapAndMultisetToMultisetCastRule
+                                        ::isValidMapToMapOrMultisetToMultisetCasting)
+                        .build());
+    }
+
+    private static boolean isValidMapToMapOrMultisetToMultisetCasting(
+            LogicalType input, LogicalType target) {
+        return input.is(LogicalTypeRoot.MAP)
+                        && target.is(LogicalTypeRoot.MAP)
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getKeyType(),
+                                        ((MapType) target).getKeyType())
+                                != null
+                        && CastRuleProvider.resolve(
+                                        ((MapType) input).getValueType(),
+                                        ((MapType) target).getValueType())
+                                != null
+                || input.is(LogicalTypeRoot.MULTISET)
+                        && target.is(LogicalTypeRoot.MULTISET)
+                        && CastRuleProvider.resolve(
+                                        ((MultisetType) input).getElementType(),
+                                        ((MultisetType) target).getElementType())
+                                != null;
+    }
+
+    /* Example generated code for MULTISET<INT> -> MULTISET<FLOAT>:
+    org.apache.flink.table.data.MapData _myInput = ((org.apache.flink.table.data.MapData)(_myInputObj));
+    boolean _myInputIsNull = _myInputObj == null;
+    boolean isNull$0;
+    org.apache.flink.table.data.MapData result$1;
+    float result$2;
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.util.Map map$838 = new java.util.HashMap();
+        for (int i$841 = 0; i$841 < _myInput.size(); i$841++) {
+            java.lang.Float key$839 = null;
+            java.lang.Integer value$840 = null;
+            if (!_myInput.keyArray().isNullAt(i$841)) {
+                result$2 = ((float)(_myInput.keyArray().getInt(i$841)));
+                key$839 = result$2;
+            }
+            value$840 = _myInput.valueArray().getInt(i$841);
+            map$838.put(key$839, value$840);
+        }
+        result$1 = new org.apache.flink.table.data.GenericMapData(map$838);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = null;
+    }
+    return result$1;
+
+     */
+    @Override
+    protected String generateCodeBlockInternal(
+            CodeGeneratorCastRule.Context context,
+            String inputTerm,
+            String returnVariable,
+            LogicalType inputLogicalType,
+            LogicalType targetLogicalType) {
+        final LogicalType innerInputKeyType;
+        final LogicalType innerInputValueType;
+
+        final LogicalType innerTargetKeyType;
+        final LogicalType innerTargetValueType;
+        if (inputLogicalType.is(LogicalTypeRoot.MULTISET)) {
+            innerInputKeyType = ((MultisetType) inputLogicalType).getElementType();
+            innerInputValueType = new IntType(false);
+            innerTargetKeyType = ((MultisetType) targetLogicalType).getElementType();
+            innerTargetValueType = new IntType(false);
+        } else {
+            innerInputKeyType = ((MapType) inputLogicalType).getKeyType();
+            innerInputValueType = ((MapType) inputLogicalType).getValueType();
+            innerTargetKeyType = ((MapType) targetLogicalType).getKeyType();
+            innerTargetValueType = ((MapType) targetLogicalType).getValueType();
+        }
+
+        final String innerTargetKeyTypeTerm = boxedTypeTermForType(innerTargetKeyType);
+        final String innerTargetValueTypeTerm = boxedTypeTermForType(innerTargetValueType);
+        final String keyArrayTerm = methodCall(inputTerm, "keyArray");
+        final String valueArrayTerm = methodCall(inputTerm, "valueArray");
+        final String size = methodCall(inputTerm, "size");
+        final String map = newName("map");
+        final String key = newName("key");
+        final String value = newName("value");
+
+        return new CastRuleUtils.CodeWriter()
+                .declStmt(className(Map.class), map, constructorCall(HashMap.class))
+                .forStmt(
+                        size,
+                        (index, codeWriter) -> {
+                            CastCodeBlock keyCodeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(
+                                                    index, keyArrayTerm, innerInputKeyType),
+                                            "false",
+                                            // Null check is done at the array access level
+                                            innerInputKeyType.copy(false),
+                                            innerTargetKeyType);
+
+                            CastCodeBlock valueCodeBlock =
+                                    CastRuleProvider.generateCodeBlock(
+                                            context,
+                                            rowFieldReadAccess(
+                                                    index, valueArrayTerm, innerInputValueType),
+                                            "false",
+                                            // Null check is done at the array access level
+                                            innerInputValueType.copy(false),

Review comment:
       @slinkydeveloper didn't we change this behavior recently and use a new method that deals with `NullType`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org