You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "liuyongvs (via GitHub)" <gi...@apache.org> on 2023/04/03 04:25:18 UTC

[GitHub] [flink] liuyongvs opened a new pull request, #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

liuyongvs opened a new pull request, #22324:
URL: https://github.com/apache/flink/pull/22324

   - What is the purpose of the change
   This is an implementation of MAP_FROM_ENTRIES
   
   - Brief change log
   MAP_FROM_ENTRIES for Table API and SQL
       
   ```
   map_from_entries(map) - Returns a map created from an arrays of row with two fields. Note that the number of fields in a row array should be 2 and the key of a row array should not be null.
   
   Syntax:
   map_from_entries(array_of_rows)
   
   Arguments:
   array_of_rows: an arrays of row with two fields.
   
   Returns:
   
   Returns a map created from an arrays of row with two fields. Note that the number of fields in a row array should be 2 and the key of a row array should not be null.
   
   Returns null if the argument is null
   
   > SELECT map_from_entries(map[1, 'a', 2, 'b']);
    [(1,"a"),(2,"b")]
   ```
   
   
   See also
   presto https://prestodb.io/docs/current/functions/map.html
   
   spark https://spark.apache.org/docs/latest/api/sql/index.html#map_from_entries
   
   - Verifying this change
   This change added tests in MapFunctionITCase.
   
   - Does this pull request potentially affect one of the following parts:
   Dependencies (does it add or upgrade a dependency): ( no)
   The public API, i.e., is any changed class annotated with @Public(Evolving): (yes )
   The serializers: (no)
   The runtime per-record code paths (performance sensitive): ( no)
   Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: ( no)
   The S3 file system connector: ( no)
   - Documentation
   Does this pull request introduce a new feature? (yes)
   If yes, how is the feature documented? (docs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs closed pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs closed pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.
URL: https://github.com/apache/flink/pull/22324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1201649136


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   i will discuss in the mail list later ,thanks @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1201632409


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   besides `map_from_array` there are others, e.g. `map` constructor
   different ways of casting e.g. 
   ```sql
   select cast(map['1', '2', ' 1', '2'] as map<int,int>);
   ```
   
   it looks like a major change and probably makes sense to discuss it first rather to make it silently (e.g. there is nothing about that in corresponding JIRA issue)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "Aitozi (via GitHub)" <gi...@apache.org>.
Aitozi commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1165074229


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/MapFromEntriesInputTypeStrategy.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#MAP_FROM_ENTRIES}.
+ *
+ * <p>It checks if an argument is an array type of row with two fields.
+ */
+@Internal
+class MapFromEntriesInputTypeStrategy implements InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(1);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
+
+        final DataType dataType = argumentDataTypes.get(0);
+        final LogicalType logicalType = dataType.getLogicalType();
+        if (logicalType.is(LogicalTypeRoot.ARRAY)
+                && ((ArrayType) logicalType).getElementType().is(LogicalTypeRoot.ROW)

Review Comment:
   Get it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on PR #22324:
URL: https://github.com/apache/flink/pull/22324#issuecomment-1549372250

   hi @dawidwys and this, do you have time to have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1201612234


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   this is job level like spark, when it has merged, i will fix the map_from_array here https://issues.apache.org/jira/browse/FLINK-31682 
   https://github.com/apache/spark/blob/7e2c6c7ab23f75a6ba83baaa0545482a43845ce8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala#L52



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1166210706


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   And i have a questions, does it need to supports function level strategy
   1) global setting can make it job level by offering this configure when using
   2) function level strategy has 3 case
       1. all last win
       2. all throw exception
       3. some of last win and others throw exception. function level only make sense this case. but if one of the throw exception strategy has same key it also throws exption. the effective is same with all throw exception. So I think supporting function-level significance is not very meaningful. What do you think?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] snuyanzin commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "snuyanzin (via GitHub)" <gi...@apache.org>.
snuyanzin commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1201573331


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   why should we have this and why this behavior is related to only one function?
   There are other `MAP` related functionalities
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on PR #22324:
URL: https://github.com/apache/flink/pull/22324#issuecomment-1495357124

   hi  @snuyanzin  do you have time to review it ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1163759521


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
    i do the surveys:
   1) max compute supports  extra arguments , which user can use different strategy for different function call of map_from_entris in a job. https://www.alibabacloud.com/help/zh/maxcompute/latest/map-from-entries
   2) while spark supports with global config https://github.com/apache/spark/blob/7e2c6c7ab23f75a6ba83baaa0545482a43845ce8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala#L52
   3) presto doesn't support the mapkey-dedup-policy, and spark supports this function later, so it suppots https://issues.apache.org/jira/browse/SPARK-23934
   4) this function is not sql standard. and other colllection functions i supports following the spark way, so i does it too. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "Aitozi (via GitHub)" <gi...@apache.org>.
Aitozi commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1163697823


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   I'm curious about why this option should be a global setting, how can user use different strategy for different function call of `map_from_entris` in a job ? Can it be an extra arguments of the function ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1201647742


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   yeap, all kinds of map should consider this in spark
   
   the map_constructor here @snuyanzin 
   https://github.com/apache/spark/blob/eeab2e701330f7bc24e9b09ce48925c2c3265aa8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L184



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "flinkbot (via GitHub)" <gi...@apache.org>.
flinkbot commented on PR #22324:
URL: https://github.com/apache/flink/pull/22324#issuecomment-1493647373

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8a534fe42ef69bc595fe9b87f1de924fdc173815",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8a534fe42ef69bc595fe9b87f1de924fdc173815",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8a534fe42ef69bc595fe9b87f1de924fdc173815 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Aitozi commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "Aitozi (via GitHub)" <gi...@apache.org>.
Aitozi commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1164956399


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/MapFromEntriesInputTypeStrategy.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#MAP_FROM_ENTRIES}.
+ *
+ * <p>It checks if an argument is an array type of row with two fields.
+ */
+@Internal
+class MapFromEntriesInputTypeStrategy implements InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(1);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
+
+        final DataType dataType = argumentDataTypes.get(0);
+        final LogicalType logicalType = dataType.getLogicalType();
+        if (logicalType.is(LogicalTypeRoot.ARRAY)
+                && ((ArrayType) logicalType).getElementType().is(LogicalTypeRoot.ROW)

Review Comment:
   do we have to check the first argument and second argument are of the same type ?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   Thanks for your detailed explanation. 
   Personally, I prefer the argument solution here, it seems more flexible. Looking forward to other reviewers suggestion here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1201649136


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   i will discuss in the mail list latter ,thanks @snuyanzin 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1165012228


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/MapFromEntriesInputTypeStrategy.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * {@link InputTypeStrategy} specific for {@link BuiltInFunctionDefinitions#MAP_FROM_ENTRIES}.
+ *
+ * <p>It checks if an argument is an array type of row with two fields.
+ */
+@Internal
+class MapFromEntriesInputTypeStrategy implements InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(1);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            CallContext callContext, boolean throwOnFailure) {
+        final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
+
+        final DataType dataType = argumentDataTypes.get(0);
+        final LogicalType logicalType = dataType.getLogicalType();
+        if (logicalType.is(LogicalTypeRoot.ARRAY)
+                && ((ArrayType) logicalType).getElementType().is(LogicalTypeRoot.ROW)

Review Comment:
   no need. because the type is row, they can be different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on a diff in pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on code in PR #22324:
URL: https://github.com/apache/flink/pull/22324#discussion_r1201612234


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -435,6 +435,16 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following the legacy behaviour "
                                     + "or the new one that introduces various fixes and improvements.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<MapKeyDedupPolicy> TABLE_EXEC_MAPKEY_DEDUP_POLICY =
+            key("table.exec.mapkey-dedup-policy")

Review Comment:
   @snuyanzin 
   this is job level like spark, when it has merged, i will fix the map_from_array here https://issues.apache.org/jira/browse/FLINK-31682 
   https://github.com/apache/spark/blob/7e2c6c7ab23f75a6ba83baaa0545482a43845ce8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala#L52



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] liuyongvs commented on pull request #22324: [FLINK-31691][table] Add built-in MAP_FROM_ENTRIES function.

Posted by "liuyongvs (via GitHub)" <gi...@apache.org>.
liuyongvs commented on PR #22324:
URL: https://github.com/apache/flink/pull/22324#issuecomment-1557131797

   hi @snuyanzin do you have time to reveiw it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org