You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/06 10:11:01 UTC

[GitHub] [flink] slinkydeveloper opened a new pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

slinkydeveloper opened a new pull request #18283:
URL: https://github.com/apache/flink/pull/18283


   ## What is the purpose of the change
   
   This PR adds the required new APIs to expose merging the options from the catalog table, when restoring a plan.
   
   
   ## Brief change log
   
   * Added `DynamicTableFactory.Context#mergeableOptions` to expose options from the catalog table.
   * Added `FormatFactory` as base interface for `DecodingFormatFactory` and `EncodingFormatFactory`. Added `FormatFactory#mergeableOptions()` to declaratively define which options can be merged from the catalog table.
   * Added `FactoryUtil#forwardOptions` to perform the merge
   * Added some validation code to verify the catalog table is correct
   * Show how to use the new APIs in json and connector-kafka
   
   ## Verifying this change
   
   Added tests in `FactoryUtil` to verify the merging process.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c58aa691ffceddb8fd4409bd57a163fa59d7961 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1007478134


   @twalthr all the comments have been addressed in the last commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #18283:
URL: https://github.com/apache/flink/pull/18283


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c7daca73465b5396a8efddbbdd75133410f77ef0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055) 
   * 9c09a873c7a7659910523ce39a5a87bad3500a13 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780311184



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c58aa691ffceddb8fd4409bd57a163fa59d7961 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026) 
   * c7daca73465b5396a8efddbbdd75133410f77ef0 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c58aa691ffceddb8fd4409bd57a163fa59d7961 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4ba5f9d62347ac14dab7e19c0d8a01229d9f5558 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111) 
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1007478134


   @twalthr all the comments have been addressed in the last commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780295708



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       I like enriching table, and i'm changing it throughout this PR, but I wonder, just for exceptions messages, wouldn't it be better keep "catalog table" as it might be more explicative for the user? Or perhaps "enriching catalog table"? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780333584



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,6 +148,14 @@ public String factoryIdentifier() {
     public DynamicTableSource createDynamicTableSource(Context context) {
         final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
+                getKeyDecodingFormat(helper);
+
+        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+                getValueDecodingFormat(helper);
+
+        helper.validateExcept(PROPERTIES_PREFIX);

Review comment:
       This would validate the non-enriched options first. Then you merge but no validation happens again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c7daca73465b5396a8efddbbdd75133410f77ef0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055) 
   * 9c09a873c7a7659910523ce39a5a87bad3500a13 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9c09a873c7a7659910523ce39a5a87bad3500a13 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29196",
       "triggerID" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * cf957c3fd877aef981ee0307f5d8534d7f86ec84 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29196) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780298551



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -519,6 +519,7 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
             return new FactoryUtil.DefaultDynamicTableContext(
                     context.getObjectIdentifier(),
                     context.getCatalogTable().copy(newOptions),

Review comment:
       My understanding is that this is adding for each used format an option called `schema-registry.subject`. From my understanding, this seems harmless... 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006443662


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5c58aa691ffceddb8fd4409bd57a163fa59d7961 (Thu Jan 06 10:16:09 UTC 2022)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r779547250



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -519,6 +519,7 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
             return new FactoryUtil.DefaultDynamicTableContext(
                     context.getObjectIdentifier(),
                     context.getCatalogTable().copy(newOptions),

Review comment:
       side note: what is this method doing? could it be a problem for the upgrade story?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FormatFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Base interface for {@link DecodingFormatFactory} and {@link EncodingFormatFactory}. */
+public interface FormatFactory extends Factory {

Review comment:
       add annotation

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,7 +148,15 @@ public String factoryIdentifier() {
     public DynamicTableSource createDynamicTableSource(Context context) {
         final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-        final ReadableConfig tableOptions = helper.getOptions();
+        final ReadableConfig tableOptions =
+                helper.forwardOptions(
+                                TOPIC,
+                                TOPIC_PATTERN,
+                                SCAN_STARTUP_MODE,
+                                SCAN_STARTUP_SPECIFIC_OFFSETS,
+                                SCAN_TOPIC_PARTITION_DISCOVERY,
+                                SCAN_STARTUP_TIMESTAMP_MILLIS)
+                        .getOptions();

Review comment:
       let's call getOptions() after calling `validate`. maybe we should not offer them as a return type of `forwardOptions`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       also:
   ```
   while the enriching catalog table has it with
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.

Review comment:
       what is `originalMap`?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the persisted plan table defines the format, "
+                                        + "or both the persisted plan table and the catalog table defines the same format",

Review comment:
       nit: dot at the end

##########
File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
##########
@@ -151,4 +151,15 @@ public String factoryIdentifier() {
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
         return options;
     }
+
+    @Override
+    public Set<ConfigOption<?>> mergeableOptions() {

Review comment:
       Call this `forwardOptions` similar to helper? Because those are not only mergeable but will be forwarded.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the persisted plan table defines the format, "
+                                        + "or both the persisted plan table and the catalog table defines the same format",

Review comment:
       also:
   
   ```
   and enriching catalog table defines...
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       Let's synchronize the variable names with the exception. `resolvedIdentifierFromContextResolvedCatalogTable` -> `identifierFromPlan`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c58aa691ffceddb8fd4409bd57a163fa59d7961 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9c09a873c7a7659910523ce39a5a87bad3500a13 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083) 
   * 4ba5f9d62347ac14dab7e19c0d8a01229d9f5558 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9c09a873c7a7659910523ce39a5a87bad3500a13 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083) 
   * 4ba5f9d62347ac14dab7e19c0d8a01229d9f5558 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4ba5f9d62347ac14dab7e19c0d8a01229d9f5558 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r779547250



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -519,6 +519,7 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
             return new FactoryUtil.DefaultDynamicTableContext(
                     context.getObjectIdentifier(),
                     context.getCatalogTable().copy(newOptions),

Review comment:
       side note: what is this method doing? could it be a problem for the upgrade story?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FormatFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Base interface for {@link DecodingFormatFactory} and {@link EncodingFormatFactory}. */
+public interface FormatFactory extends Factory {

Review comment:
       add annotation

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,7 +148,15 @@ public String factoryIdentifier() {
     public DynamicTableSource createDynamicTableSource(Context context) {
         final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-        final ReadableConfig tableOptions = helper.getOptions();
+        final ReadableConfig tableOptions =
+                helper.forwardOptions(
+                                TOPIC,
+                                TOPIC_PATTERN,
+                                SCAN_STARTUP_MODE,
+                                SCAN_STARTUP_SPECIFIC_OFFSETS,
+                                SCAN_TOPIC_PARTITION_DISCOVERY,
+                                SCAN_STARTUP_TIMESTAMP_MILLIS)
+                        .getOptions();

Review comment:
       let's call getOptions() after calling `validate`. maybe we should not offer them as a return type of `forwardOptions`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       also:
   ```
   while the enriching catalog table has it with
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.

Review comment:
       what is `originalMap`?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the persisted plan table defines the format, "
+                                        + "or both the persisted plan table and the catalog table defines the same format",

Review comment:
       nit: dot at the end

##########
File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
##########
@@ -151,4 +151,15 @@ public String factoryIdentifier() {
         options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
         return options;
     }
+
+    @Override
+    public Set<ConfigOption<?>> mergeableOptions() {

Review comment:
       Call this `forwardOptions` similar to helper? Because those are not only mergeable but will be forwarded.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the persisted plan table defines the format, "
+                                        + "or both the persisted plan table and the catalog table defines the same format",

Review comment:
       also:
   
   ```
   and enriching catalog table defines...
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       Let's synchronize the variable names with the exception. `resolvedIdentifierFromContextResolvedCatalogTable` -> `identifierFromPlan`?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       also:
   ```
   while the enriching table has it with
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the persisted plan table defines the format, "
+                                        + "or both the persisted plan table and the catalog table defines the same format",

Review comment:
       also:
   
   ```
   and enriching table defines...
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       +1

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,6 +148,14 @@ public String factoryIdentifier() {
     public DynamicTableSource createDynamicTableSource(Context context) {
         final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyDecodingFormat =
+                getKeyDecodingFormat(helper);
+
+        final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+                getValueDecodingFormat(helper);
+
+        helper.validateExcept(PROPERTIES_PREFIX);

Review comment:
       This would validate the non-enriched options first. Then you merge but no validation happens again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #18283:
URL: https://github.com/apache/flink/pull/18283


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4ba5f9d62347ac14dab7e19c0d8a01229d9f5558 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111) 
   * ad75062f2a516ec05c3cecaa41cec671e2faebce UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   * cf957c3fd877aef981ee0307f5d8534d7f86ec84 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29196",
       "triggerID" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   * cf957c3fd877aef981ee0307f5d8534d7f86ec84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29196) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   * cf957c3fd877aef981ee0307f5d8534d7f86ec84 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780295708



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       I like enriching table, and i'm changing it throughout this PR, but I wonder, just for exceptions messages, wouldn't it be better keep "catalog table" as it might be more explicative for the user? Or perhaps "enriching catalog table"? 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -519,6 +519,7 @@ private static boolean hasKafkaClientProperties(Map<String, String> tableOptions
             return new FactoryUtil.DefaultDynamicTableContext(
                     context.getObjectIdentifier(),
                     context.getCatalogTable().copy(newOptions),

Review comment:
       My understanding is that this is adding for each used format an option called `schema-registry.subject`. From my understanding, this seems harmless... 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,7 +148,15 @@ public String factoryIdentifier() {
     public DynamicTableSource createDynamicTableSource(Context context) {
         final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-        final ReadableConfig tableOptions = helper.getOptions();
+        final ReadableConfig tableOptions =
+                helper.forwardOptions(
+                                TOPIC,
+                                TOPIC_PATTERN,
+                                SCAN_STARTUP_MODE,
+                                SCAN_STARTUP_SPECIFIC_OFFSETS,
+                                SCAN_TOPIC_PARTITION_DISCOVERY,
+                                SCAN_STARTUP_TIMESTAMP_MILLIS)
+                        .getOptions();

Review comment:
       `forwardOptions` doesn't return anything, it's just a fluent method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr commented on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006723044


   Sorry, I closed this by accident.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5c58aa691ffceddb8fd4409bd57a163fa59d7961 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026) 
   * c7daca73465b5396a8efddbbdd75133410f77ef0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c7daca73465b5396a8efddbbdd75133410f77ef0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780229275



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "

Review comment:
       also:
   ```
   while the enriching table has it with
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOp
             return getFormatPrefix(formatOption, identifier);
         }
 
-        private ReadableConfig projectOptions(String formatPrefix) {
-            return new DelegatingConfiguration(allOptions, formatPrefix);
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> mergeableConfigOptions = formatFactory.mergeableOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, formatPrefix);
+            if (mergeableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromMergeableOptions =
+                    new DelegatingConfiguration(mergeableOptions, formatPrefix);
+
+            for (ConfigOption<?> option : mergeableConfigOptions) {
+                formatConfFromMergeableOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only if the original
+         * configuration contains the format config option. It will fail if there is a mismatch of
+         * the identifier between the format in originalMap and the one in catalog.
+         */
+        private void checkFormatIdentifierMatchesWithMergeableOptions(
+                ConfigOption<String> formatOption,
+                String resolvedIdentifierFromContextResolvedCatalogTable) {
+            Optional<String> identifierFromMergeableOptions =
+                    mergeableOptions.getOptional(formatOption);
+
+            if (!identifierFromMergeableOptions.isPresent()) {
+                return;
+            }
+
+            if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the persisted plan table defines the format, "
+                                        + "or both the persisted plan table and the catalog table defines the same format",

Review comment:
       also:
   
   ```
   and enriching table defines...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] slinkydeveloper commented on a change in pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
slinkydeveloper commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r780306921



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,7 +148,15 @@ public String factoryIdentifier() {
     public DynamicTableSource createDynamicTableSource(Context context) {
         final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-        final ReadableConfig tableOptions = helper.getOptions();
+        final ReadableConfig tableOptions =
+                helper.forwardOptions(
+                                TOPIC,
+                                TOPIC_PATTERN,
+                                SCAN_STARTUP_MODE,
+                                SCAN_STARTUP_SPECIFIC_OFFSETS,
+                                SCAN_TOPIC_PARTITION_DISCOVERY,
+                                SCAN_STARTUP_TIMESTAMP_MILLIS)
+                        .getOptions();

Review comment:
       `forwardOptions` doesn't return anything, it's just a fluent method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   * cf957c3fd877aef981ee0307f5d8534d7f86ec84 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #18283: [FLINK-25390][table] Add API to support merging options from catalog table

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #18283:
URL: https://github.com/apache/flink/pull/18283#issuecomment-1006442061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29026",
       "triggerID" : "5c58aa691ffceddb8fd4409bd57a163fa59d7961",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29055",
       "triggerID" : "c7daca73465b5396a8efddbbdd75133410f77ef0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29083",
       "triggerID" : "9c09a873c7a7659910523ce39a5a87bad3500a13",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29111",
       "triggerID" : "4ba5f9d62347ac14dab7e19c0d8a01229d9f5558",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115",
       "triggerID" : "ad75062f2a516ec05c3cecaa41cec671e2faebce",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191",
       "triggerID" : "d240ffefb73bfbe9d97824b917a388be631891ba",
       "triggerType" : "PUSH"
     }, {
       "hash" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29196",
       "triggerID" : "cf957c3fd877aef981ee0307f5d8534d7f86ec84",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad75062f2a516ec05c3cecaa41cec671e2faebce Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29115) 
   * d240ffefb73bfbe9d97824b917a388be631891ba Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29191) 
   * cf957c3fd877aef981ee0307f5d8534d7f86ec84 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=29196) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org