You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/14 13:37:59 UTC

[flink] 02/03: [FLINK-22475][table-common] Exclude options with '#' placeholder from validation of required options

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6be47cc5baa098e12855f7727c6c4336298bef91
Author: Ingo Bürk <in...@tngtech.com>
AuthorDate: Tue May 11 15:48:06 2021 +0200

    [FLINK-22475][table-common] Exclude options with '#' placeholder from validation of required options
---
 .../java/org/apache/flink/table/factories/FactoryUtil.java  | 11 ++++++++++-
 .../org/apache/flink/table/factories/FactoryUtilTest.java   | 13 +++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index c038e79..c7c8c82 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -108,6 +108,12 @@ public final class FactoryUtil {
     public static final String FORMAT_SUFFIX = ".format";
 
     /**
+     * The placeholder symbol to be used for keys of options which can be templated. See {@link
+     * Factory} for details.
+     */
+    public static final String PLACEHOLDER_SYMBOL = "#";
+
+    /**
      * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
      *
      * <p>It considers {@link Catalog#getFactory()} if provided.
@@ -362,6 +368,9 @@ public final class FactoryUtil {
 
         final List<String> missingRequiredOptions =
                 requiredOptions.stream()
+                        // Templated options will never appear with their template key, so we need
+                        // to ignore them as required properties here
+                        .filter(option -> !option.key().contains(PLACEHOLDER_SYMBOL))
                         .filter(option -> readOption(options, option) == null)
                         .map(ConfigOption::key)
                         .sorted()
@@ -384,7 +393,7 @@ public final class FactoryUtil {
             String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys) {
         final Set<String> remainingOptionKeys = new HashSet<>(allOptionKeys);
         remainingOptionKeys.removeAll(consumedOptionKeys);
-        if (remainingOptionKeys.size() > 0) {
+        if (!remainingOptionKeys.isEmpty()) {
             throw new ValidationException(
                     String.format(
                             "Unsupported options found for '%s'.\n\n"
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index 8d523f2..47c3d98 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.table.factories;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
@@ -37,9 +40,11 @@ import org.junit.rules.ExpectedException;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
@@ -278,6 +283,14 @@ public class FactoryUtilTest {
     }
 
     @Test
+    public void testRequiredPlaceholderOption() {
+        final Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+        requiredOptions.add(ConfigOptions.key("fields.#.min").intType().noDefaultValue());
+
+        FactoryUtil.validateFactoryOptions(requiredOptions, new HashSet<>(), new Configuration());
+    }
+
+    @Test
     public void testCreateCatalog() {
         final Map<String, String> options = new HashMap<>();
         options.put(CommonCatalogOptions.CATALOG_TYPE.key(), TestCatalogFactory.IDENTIFIER);