You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/30 09:13:29 UTC

[GitHub] [flink] twalthr opened a new pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

twalthr opened a new pull request #11959:
URL: https://github.com/apache/flink/pull/11959


   ## What is the purpose of the change
   
   This implements the new factory interfaces mentioned in FLIP-95. It implements new factory utilities that can be used for FLIP-122 and future factories.
   
   It adds `TestDynamicTableFactory` and `TestFormatFactory` for reference implementations of new factories.
   
   ## Brief change log
   
   See commit messages.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows: `FactoryUtilTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419928465



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(

Review comment:
       The format is logic is a bit different from the top-level factory. It cannot check unconsumed option keys, but only has a read-only view for which it can access its own options. Otherwise we need to change `ReadableConfig` for listing all options and maybe also `DelegatingConfiguration`. I think functionality-wise it doesn't make a difference. It is sufficient that only the top-level checks for unconsumed options considering all nested factories.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419946245



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(

Review comment:
       For example, if we have a kafka table with the following options:  
   
   ```sql
   'connector' = 'kafka',
   'topic' = 'test_topic',
   'format' = 'json',
   'format.fail-on-missing-field' = 'true',
   'format.invalid-option' = 'true'
   ```
   
   Kafka connector factory don't have `format.*` options in `requiredOptions` and `optionalOptions`, so the unconsumed options should be validated by format factory (i.e. the `format.invalid-option`). Otherwise, users can write arbitraty format options. 
   
   Besides, current `TableFactoryHelper#validate` doesn't take the format options into account. Which means the validation will fail if any format option exist. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419978370



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(

Review comment:
       Sorry, I misunderstood the logic of `discoverOptionalFormatFactory`. Thanks for the explanation. I don't have other concerns now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-623915765


   Thanks for the review @wuchong and @dawidwys. I updated the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419926234



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+		factory.requiredOptions()
+			.forEach(option -> {
+				final Object value = readOption(options, option);
+				if (value == null) {
+					throw new ValidationException(
+						String.format(
+							"A value for required option '%s' is missing.\n\n" +
+							"Required options are:\n\n" +
+							"%s",
+							option.key(),
+							factory.requiredOptions().stream()
+								.map(ConfigOption::key)
+								.sorted()
+								.collect(Collectors.joining("\n"))));
+				}
+			});
+		factory.optionalOptions()
+			.forEach(option -> readOption(options, option));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+			Class<T> factoryClass,
+			@Nullable Catalog catalog,
+			DefaultDynamicTableContext context) {
+		// catalog factory has highest precedence
+		if (catalog != null) {
+			final Factory factory = catalog.getFactory()
+				.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+				.orElse(null);
+			if (factory != null) {
+				return (T) factory;
+			}
+		}
+
+		// fallback to factory discovery
+		final String connectorOption = context.getCatalogTable()
+			.getOptions()
+			.get(CONNECTOR.key());
+		if (connectorOption == null) {
+			throw new ValidationException(
+				String.format(
+					"Table options do not contain an option key '%s' for discovering a connector.",
+					CONNECTOR.key()));
+		}
+		try {
+			return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
+		} catch (ValidationException e) {
+			throw new ValidationException(
+				String.format(
+					"Cannot discover a connector using option '%s'.",
+					stringifyOption(CONNECTOR.key(), connectorOption)),
+				e);
+		}
+	}
+
+	private static List<Factory> discoverFactories(ClassLoader classLoader) {
+		try {
+			final List<Factory> result = new LinkedList<>();
+			ServiceLoader
+				.load(Factory.class, classLoader)
+				.iterator()
+				.forEachRemaining(result::add);
+			return result;
+		} catch (ServiceConfigurationError e) {
+			LOG.error("Could not load service provider for factories.", e);
+			throw new TableException("Could not load service provider for factories.", e);
+		}
+	}
+
+	private static String stringifyOption(String key, String value) {
+		return String.format(
+			"'%s'='%s'",
+			EncodingUtils.escapeSingleQuotes(key),
+			EncodingUtils.escapeSingleQuotes(value));
+	}
+
+	private static Configuration asConfiguration(Map<String, String> options) {
+		final Configuration configuration = new Configuration();
+		options.forEach(configuration::setString);
+		return configuration;
+	}
+
+	private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+		try {
+			return options.get(option);
+		} catch (Throwable t) {
+			throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+	 */
+	public static class TableFactoryHelper {
+
+		private final DynamicTableFactory tableFactory;
+
+		private final DynamicTableFactory.Context context;
+
+		private final Configuration allOptions;
+
+		private final Set<String> consumedOptionKeys;
+
+		private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
+			this.tableFactory = tableFactory;
+			this.context = context;
+			this.allOptions = asConfiguration(context.getCatalogTable().getOptions());
+			this.consumedOptionKeys = new HashSet<>();
+			this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+			this.consumedOptionKeys.add(CONNECTOR.key());
+			this.consumedOptionKeys.addAll(
+				tableFactory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+			this.consumedOptionKeys.addAll(
+				tableFactory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required scan format option '%s'.", formatOption.key())));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
+		 * identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+				.map(formatFactory -> {
+					try {
+						return formatFactory.createScanFormat(context, projectOptions(formatPrefix));

Review comment:
       The format receives only a view. It does not verify unconsumed options. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606",
       "triggerID" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5945b29e8095d55185720b4e8eb7e1ce8149a4a7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=637",
       "triggerID" : "5945b29e8095d55185720b4e8eb7e1ce8149a4a7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606) 
   * 5945b29e8095d55185720b4e8eb7e1ce8149a4a7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=637) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606",
       "triggerID" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471) 
   * f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-623915765


   Thanks for the review @wuchong and @dawidwys. I updated the PR. I will update the FLIP and inform the mailing list after this is merged.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419893421



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Base interface for connector formats.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface is an intermediate representation before constructing actual
+ * runtime implementation.
+ *
+ * <p>Formats can be distinguished along two dimensions:
+ * <ul>
+ *     <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or
+ *     some bulk interface).</li>
+ * </ul>
+ *
+ * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector.
+ *
+ * @see ScanFormat
+ * @see SinkFormat
+ *
+ * @param <I> underlying runtime interface
+ */
+@Internal
+public interface Format<I> {

Review comment:
       good point, it was required in an earlier version but not anymore




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471) 
   * f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r418975276



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,

Review comment:
       IIUC, this method is called in `CatalogSourceTable` to create table source from the catalog table. However, in `CatalogSourceTable` or even the precedent `CatalogSchemaTable`, we can't get the instance of `Catalog`, because we don't want to reference the whole `Catalog` in tables. So this method maybe not handy to use. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419917599



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+		factory.requiredOptions()
+			.forEach(option -> {

Review comment:
       good idea, better than a couple of trial and errors




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419389063



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(

Review comment:
       Shall we add a helper for `FormatFactory` too? Currently, we don't have a validation utility used in `ScanFormatFactory#createScanFormat` to also check for unconsumed option keys.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+		factory.requiredOptions()
+			.forEach(option -> {
+				final Object value = readOption(options, option);
+				if (value == null) {
+					throw new ValidationException(
+						String.format(
+							"A value for required option '%s' is missing.\n\n" +
+							"Required options are:\n\n" +
+							"%s",
+							option.key(),
+							factory.requiredOptions().stream()
+								.map(ConfigOption::key)
+								.sorted()
+								.collect(Collectors.joining("\n"))));
+				}
+			});
+		factory.optionalOptions()
+			.forEach(option -> readOption(options, option));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+			Class<T> factoryClass,
+			@Nullable Catalog catalog,
+			DefaultDynamicTableContext context) {
+		// catalog factory has highest precedence
+		if (catalog != null) {
+			final Factory factory = catalog.getFactory()
+				.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+				.orElse(null);
+			if (factory != null) {
+				return (T) factory;
+			}
+		}
+
+		// fallback to factory discovery
+		final String connectorOption = context.getCatalogTable()
+			.getOptions()
+			.get(CONNECTOR.key());
+		if (connectorOption == null) {
+			throw new ValidationException(
+				String.format(
+					"Table options do not contain an option key '%s' for discovering a connector.",
+					CONNECTOR.key()));
+		}
+		try {
+			return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
+		} catch (ValidationException e) {
+			throw new ValidationException(
+				String.format(
+					"Cannot discover a connector using option '%s'.",
+					stringifyOption(CONNECTOR.key(), connectorOption)),
+				e);
+		}
+	}
+
+	private static List<Factory> discoverFactories(ClassLoader classLoader) {
+		try {
+			final List<Factory> result = new LinkedList<>();
+			ServiceLoader
+				.load(Factory.class, classLoader)
+				.iterator()
+				.forEachRemaining(result::add);
+			return result;
+		} catch (ServiceConfigurationError e) {
+			LOG.error("Could not load service provider for factories.", e);
+			throw new TableException("Could not load service provider for factories.", e);
+		}
+	}
+
+	private static String stringifyOption(String key, String value) {
+		return String.format(
+			"'%s'='%s'",
+			EncodingUtils.escapeSingleQuotes(key),
+			EncodingUtils.escapeSingleQuotes(value));
+	}
+
+	private static Configuration asConfiguration(Map<String, String> options) {
+		final Configuration configuration = new Configuration();
+		options.forEach(configuration::setString);
+		return configuration;
+	}
+
+	private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+		try {
+			return options.get(option);
+		} catch (Throwable t) {
+			throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+	 */
+	public static class TableFactoryHelper {
+
+		private final DynamicTableFactory tableFactory;
+
+		private final DynamicTableFactory.Context context;
+
+		private final Configuration allOptions;
+
+		private final Set<String> consumedOptionKeys;
+
+		private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
+			this.tableFactory = tableFactory;
+			this.context = context;
+			this.allOptions = asConfiguration(context.getCatalogTable().getOptions());
+			this.consumedOptionKeys = new HashSet<>();
+			this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+			this.consumedOptionKeys.add(CONNECTOR.key());
+			this.consumedOptionKeys.addAll(
+				tableFactory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+			this.consumedOptionKeys.addAll(
+				tableFactory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required scan format option '%s'.", formatOption.key())));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
+		 * identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+				.map(formatFactory -> {
+					try {
+						return formatFactory.createScanFormat(context, projectOptions(formatPrefix));

Review comment:
       If we are going to use `format.kind` as the format identifier key, then we must remove the `format.kind` key from the `projectOptions`, otherwise, `kind` is an unsupported option. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606",
       "triggerID" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5945b29e8095d55185720b4e8eb7e1ce8149a4a7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5945b29e8095d55185720b4e8eb7e1ce8149a4a7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606) 
   * 5945b29e8095d55185720b4e8eb7e1ce8149a4a7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r418922875



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal

Review comment:
       Shall we mark the `FactoryUtil` as `PublicEvolving`? This class appears in many Javadocs of public interfaces.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419958777



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(

Review comment:
       `TableFactoryHelper#validate` does take format options into account. In the current implementation, it doesn't matter where a typo is:
   
   ```
   fromat.fail-on-missing-field
   format.fial-on-missing-field
   ```
   
   In both cases the exception is the same, that there are unconsumed options. Not every nested factory must check that and the upper level as well, we can just do it once.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dawidwys commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
dawidwys commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r418938345



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
##########
@@ -48,6 +50,21 @@
 @PublicEvolving
 public interface Catalog {
 
+	/**
+	 * Returns a factory for creating instances from catalog objects.
+	 *
+	 * <p>This method enables bypassing the discovery process. Implementers can directly pass internal
+	 * catalog-specific objects to their own factory. For example, a custom {@link CatalogTable} can
+	 * be processed by a custom {@link DynamicTableFactory}.
+	 *
+	 * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement multiple
+	 * supported extension points. An {@code instanceof} check is performed by the caller that checks
+	 * whether a required factory is implemented; otherwise the discovery process is used.
+	 */
+	default Optional<Factory> getFactory() {

Review comment:
       Shall we deprecate the other `getTableFactory`? If not can we describe the relationship of the two methods? How do they differ?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link DynamicTableSink}.
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@Internal
+public interface SinkFormat<I> extends Format<I> {

Review comment:
       Shouldn't the format factories be also `PublicEvolving`?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
##########
@@ -28,13 +29,24 @@
  * key-value pairs defining the properties of the table.
  */
 public interface CatalogBaseTable {
+
 	/**
-	 * Get the properties of the table.
-	 *
-	 * @return property map of the table/view
+	 * @deprecated Use {@link #getOptions()}.
 	 */
+	@Deprecated
 	Map<String, String> getProperties();
 
+	/**
+	 * Returns a map of string-based options.
+	 *
+	 * <p>In case of {@link CatalogTable}, these options may determine the kind of connector and its

Review comment:
       nit: may or will?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Base interface for connector formats.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface is an intermediate representation before constructing actual
+ * runtime implementation.
+ *
+ * <p>Formats can be distinguished along two dimensions:
+ * <ul>
+ *     <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or
+ *     some bulk interface).</li>
+ * </ul>
+ *
+ * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector.
+ *
+ * @see ScanFormat
+ * @see SinkFormat
+ *
+ * @param <I> underlying runtime interface
+ */
+@Internal
+public interface Format<I> {

Review comment:
       Is it necessary that this interface is generic? The generic parameter will never be used in this class imo. I think it makes more sense to have the generic on the `Scan/Sink` level.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+		factory.requiredOptions()
+			.forEach(option -> {

Review comment:
       nit: Could we change it so that it checks all the missing required options? I think it would be a nice user experience if we list all missing options up front.
   It should be as simple as:
   
   ```
   List<String> missingRequiredOptions = factory.requiredOptions()
       .filter(option -> readOption(options, option) == null)
       .map(ConfigOption::key)
       .collect(Collectors.toList());
   
   if (!missingRequiredOptions.isEmpty()) {
       throw new ValidationException(...);
   }
   ```

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+		factory.requiredOptions()
+			.forEach(option -> {
+				final Object value = readOption(options, option);
+				if (value == null) {
+					throw new ValidationException(
+						String.format(
+							"A value for required option '%s' is missing.\n\n" +
+							"Required options are:\n\n" +
+							"%s",
+							option.key(),
+							factory.requiredOptions().stream()
+								.map(ConfigOption::key)
+								.sorted()
+								.collect(Collectors.joining("\n"))));
+				}
+			});
+		factory.optionalOptions()
+			.forEach(option -> readOption(options, option));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+			Class<T> factoryClass,
+			@Nullable Catalog catalog,
+			DefaultDynamicTableContext context) {
+		// catalog factory has highest precedence
+		if (catalog != null) {
+			final Factory factory = catalog.getFactory()
+				.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+				.orElse(null);
+			if (factory != null) {
+				return (T) factory;
+			}
+		}
+
+		// fallback to factory discovery
+		final String connectorOption = context.getCatalogTable()
+			.getOptions()
+			.get(CONNECTOR.key());
+		if (connectorOption == null) {
+			throw new ValidationException(
+				String.format(
+					"Table options do not contain an option key '%s' for discovering a connector.",
+					CONNECTOR.key()));
+		}
+		try {
+			return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
+		} catch (ValidationException e) {
+			throw new ValidationException(
+				String.format(
+					"Cannot discover a connector using option '%s'.",
+					stringifyOption(CONNECTOR.key(), connectorOption)),
+				e);
+		}
+	}
+
+	private static List<Factory> discoverFactories(ClassLoader classLoader) {
+		try {
+			final List<Factory> result = new LinkedList<>();
+			ServiceLoader
+				.load(Factory.class, classLoader)
+				.iterator()
+				.forEachRemaining(result::add);
+			return result;
+		} catch (ServiceConfigurationError e) {
+			LOG.error("Could not load service provider for factories.", e);
+			throw new TableException("Could not load service provider for factories.", e);
+		}
+	}
+
+	private static String stringifyOption(String key, String value) {
+		return String.format(
+			"'%s'='%s'",
+			EncodingUtils.escapeSingleQuotes(key),
+			EncodingUtils.escapeSingleQuotes(value));
+	}
+
+	private static Configuration asConfiguration(Map<String, String> options) {
+		final Configuration configuration = new Configuration();
+		options.forEach(configuration::setString);
+		return configuration;
+	}
+
+	private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+		try {
+			return options.get(option);
+		} catch (Throwable t) {
+			throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+	 */
+	public static class TableFactoryHelper {
+
+		private final DynamicTableFactory tableFactory;
+
+		private final DynamicTableFactory.Context context;
+
+		private final Configuration allOptions;
+
+		private final Set<String> consumedOptionKeys;
+
+		private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
+			this.tableFactory = tableFactory;
+			this.context = context;
+			this.allOptions = asConfiguration(context.getCatalogTable().getOptions());
+			this.consumedOptionKeys = new HashSet<>();
+			this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+			this.consumedOptionKeys.add(CONNECTOR.key());
+			this.consumedOptionKeys.addAll(
+				tableFactory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+			this.consumedOptionKeys.addAll(
+				tableFactory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required scan format option '%s'.", formatOption.key())));

Review comment:
       nit: drop the `option`. I think it would sound nicer if we simply say we could not find the format.

##########
File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock;
+import org.apache.flink.table.factories.TestFormatFactory.ScanFormatMock;
+import org.apache.flink.table.factories.TestFormatFactory.SinkFormatMock;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FactoryUtil}.
+ */
+public class FactoryUtilTest {

Review comment:
       I like this test. It's easy to follow the test cases.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+		factory.requiredOptions()
+			.forEach(option -> {
+				final Object value = readOption(options, option);
+				if (value == null) {
+					throw new ValidationException(
+						String.format(
+							"A value for required option '%s' is missing.\n\n" +
+							"Required options are:\n\n" +
+							"%s",
+							option.key(),
+							factory.requiredOptions().stream()
+								.map(ConfigOption::key)
+								.sorted()
+								.collect(Collectors.joining("\n"))));
+				}
+			});
+		factory.optionalOptions()
+			.forEach(option -> readOption(options, option));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+			Class<T> factoryClass,
+			@Nullable Catalog catalog,
+			DefaultDynamicTableContext context) {
+		// catalog factory has highest precedence
+		if (catalog != null) {
+			final Factory factory = catalog.getFactory()
+				.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+				.orElse(null);
+			if (factory != null) {
+				return (T) factory;
+			}
+		}
+
+		// fallback to factory discovery
+		final String connectorOption = context.getCatalogTable()
+			.getOptions()
+			.get(CONNECTOR.key());
+		if (connectorOption == null) {
+			throw new ValidationException(
+				String.format(
+					"Table options do not contain an option key '%s' for discovering a connector.",
+					CONNECTOR.key()));
+		}
+		try {
+			return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
+		} catch (ValidationException e) {
+			throw new ValidationException(
+				String.format(
+					"Cannot discover a connector using option '%s'.",
+					stringifyOption(CONNECTOR.key(), connectorOption)),
+				e);
+		}
+	}
+
+	private static List<Factory> discoverFactories(ClassLoader classLoader) {
+		try {
+			final List<Factory> result = new LinkedList<>();
+			ServiceLoader
+				.load(Factory.class, classLoader)
+				.iterator()
+				.forEachRemaining(result::add);
+			return result;
+		} catch (ServiceConfigurationError e) {
+			LOG.error("Could not load service provider for factories.", e);
+			throw new TableException("Could not load service provider for factories.", e);
+		}
+	}
+
+	private static String stringifyOption(String key, String value) {
+		return String.format(
+			"'%s'='%s'",
+			EncodingUtils.escapeSingleQuotes(key),
+			EncodingUtils.escapeSingleQuotes(value));
+	}
+
+	private static Configuration asConfiguration(Map<String, String> options) {
+		final Configuration configuration = new Configuration();
+		options.forEach(configuration::setString);
+		return configuration;
+	}
+
+	private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+		try {
+			return options.get(option);
+		} catch (Throwable t) {
+			throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+	 */
+	public static class TableFactoryHelper {
+
+		private final DynamicTableFactory tableFactory;
+
+		private final DynamicTableFactory.Context context;
+
+		private final Configuration allOptions;
+
+		private final Set<String> consumedOptionKeys;
+
+		private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
+			this.tableFactory = tableFactory;
+			this.context = context;
+			this.allOptions = asConfiguration(context.getCatalogTable().getOptions());
+			this.consumedOptionKeys = new HashSet<>();
+			this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+			this.consumedOptionKeys.add(CONNECTOR.key());
+			this.consumedOptionKeys.addAll(
+				tableFactory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+			this.consumedOptionKeys.addAll(
+				tableFactory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required scan format option '%s'.", formatOption.key())));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
+		 * identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+				.map(formatFactory -> {
+					try {
+						return formatFactory.createScanFormat(context, projectOptions(formatPrefix));
+					} catch (Throwable t) {
+						throw new ValidationException(
+							String.format(
+								"Error creating scan format '%s' in option space '%s'.",
+								formatFactory.factoryIdentifier(),
+								formatPrefix),
+							t);
+					}
+				});
+		}
+
+		/**
+		 * Discovers a {@link SinkFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalSinkFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required scan format option '%s'.", formatOption.key())));

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621715626


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca (Thu Apr 30 09:16:51 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=471",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606",
       "triggerID" : "f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f3f9a12ef063d5c65c3bad333955b6d6ebf79f0f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=606) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r418923036



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Base interface for connector formats.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface is an intermediate representation before constructing actual
+ * runtime implementation.
+ *
+ * <p>Formats can be distinguished along two dimensions:
+ * <ul>
+ *     <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or
+ *     some bulk interface).</li>
+ * </ul>
+ *
+ * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector.
+ *
+ * @see ScanFormat
+ * @see SinkFormat
+ *
+ * @param <I> underlying runtime interface
+ */
+@Internal
+public interface Format<I> {
+
+	/**
+	 * Returns the set of changes that a connector (and transitively the planner) can expect during
+	 * runtime.
+	 */
+	ChangelogMode createChangelogMode();

Review comment:
       `getChangelogMode()` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #11959:
URL: https://github.com/apache/flink/pull/11959#issuecomment-621719729


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61bfafbd073abc1f9ceaf2e9b375cb0735bcd6ca UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r418975276



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,

Review comment:
       IIUC, this method is called in `CatalogSourceTable` to create table source from the catalog table. However, in `CatalogSourceTable` or even the precedent `CatalogSchemaTable`, we can't get the instance of `Catalog`, because we don't want to reference the whole `Catalog` in tables. So this method maybe not handh to use. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419947291



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,

Review comment:
       I don't have a strong opinion on this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419892167



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
##########
@@ -28,13 +29,24 @@
  * key-value pairs defining the properties of the table.
  */
 public interface CatalogBaseTable {
+
 	/**
-	 * Get the properties of the table.
-	 *
-	 * @return property map of the table/view
+	 * @deprecated Use {@link #getOptions()}.
 	 */
+	@Deprecated
 	Map<String, String> getProperties();
 
+	/**
+	 * Returns a map of string-based options.
+	 *
+	 * <p>In case of {@link CatalogTable}, these options may determine the kind of connector and its

Review comment:
       `may` because if the Catalog provides its own factory it might not be used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419918897



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal

Review comment:
       I discussed this with @dawidwys. We are fine marking both formats and the util as `PublicEvolving`. I think we spend enough thoughts into the new design and can help user reduce boilerplate code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #11959: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #11959:
URL: https://github.com/apache/flink/pull/11959#discussion_r419925365



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@Internal
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,

Review comment:
       It should not be too hard to add a reference to the `Catalog` to those classes. We also need to pass a `ObjectIdentifier` for the context. We can also lookup the catalog again using the `FlinkContext` and object identifier if that would simplify the design. Can you elaborate why you don't want that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org