You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/05/14 13:37:59 UTC
[flink] 02/03: [FLINK-22475][table-common] Exclude options with '#'
placeholder from validation of required options
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6be47cc5baa098e12855f7727c6c4336298bef91
Author: Ingo Bürk <in...@tngtech.com>
AuthorDate: Tue May 11 15:48:06 2021 +0200
[FLINK-22475][table-common] Exclude options with '#' placeholder from validation of required options
---
.../java/org/apache/flink/table/factories/FactoryUtil.java | 11 ++++++++++-
.../org/apache/flink/table/factories/FactoryUtilTest.java | 13 +++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index c038e79..c7c8c82 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -108,6 +108,12 @@ public final class FactoryUtil {
public static final String FORMAT_SUFFIX = ".format";
/**
+ * The placeholder symbol to be used for keys of options which can be templated. See {@link
+ * Factory} for details.
+ */
+ public static final String PLACEHOLDER_SYMBOL = "#";
+
+ /**
* Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
*
* <p>It considers {@link Catalog#getFactory()} if provided.
@@ -362,6 +368,9 @@ public final class FactoryUtil {
final List<String> missingRequiredOptions =
requiredOptions.stream()
+ // Templated options will never appear with their template key, so we need
+ // to ignore them as required properties here
+ .filter(option -> !option.key().contains(PLACEHOLDER_SYMBOL))
.filter(option -> readOption(options, option) == null)
.map(ConfigOption::key)
.sorted()
@@ -384,7 +393,7 @@ public final class FactoryUtil {
String factoryIdentifier, Set<String> allOptionKeys, Set<String> consumedOptionKeys) {
final Set<String> remainingOptionKeys = new HashSet<>(allOptionKeys);
remainingOptionKeys.removeAll(consumedOptionKeys);
- if (remainingOptionKeys.size() > 0) {
+ if (!remainingOptionKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Unsupported options found for '%s'.\n\n"
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index 8d523f2..47c3d98 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.table.factories;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
@@ -37,9 +40,11 @@ import org.junit.rules.ExpectedException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Consumer;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
@@ -278,6 +283,14 @@ public class FactoryUtilTest {
}
@Test
+ public void testRequiredPlaceholderOption() {
+ final Set<ConfigOption<?>> requiredOptions = new HashSet<>();
+ requiredOptions.add(ConfigOptions.key("fields.#.min").intType().noDefaultValue());
+
+ FactoryUtil.validateFactoryOptions(requiredOptions, new HashSet<>(), new Configuration());
+ }
+
+ @Test
public void testCreateCatalog() {
final Map<String, String> options = new HashMap<>();
options.put(CommonCatalogOptions.CATALOG_TYPE.key(), TestCatalogFactory.IDENTIFIER);