You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/05 18:48:54 UTC

[flink] 03/03: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fef974d7df1a9dabd62f3288c086185d69298bc1
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Apr 30 11:03:47 2020 +0200

    [FLINK-16997][table-common] Add new factory interfaces and discovery utilities
    
    Implements the new factory interfaces mentioned in FLIP-95. Adds new factory utilities
    that can be used for FLIP-122 and future factories.
    
    It adds TestDynamicTableFactory and TestFormatFactory for reference implementations
    of new factories.
    
    This closes #11959.
---
 .../table/tests/test_catalog_completeness.py       |   1 +
 .../org/apache/flink/table/catalog/Catalog.java    |  20 +
 .../flink/table/connector/format/Format.java       |  55 ++
 .../flink/table/connector/format/ScanFormat.java   |  37 ++
 .../flink/table/connector/format/SinkFormat.java   |  38 ++
 .../factories/DeserializationFormatFactory.java    |  34 ++
 .../flink/table/factories/DynamicTableFactory.java |  73 +++
 .../table/factories/DynamicTableSinkFactory.java   |  42 ++
 .../table/factories/DynamicTableSourceFactory.java |  42 ++
 .../org/apache/flink/table/factories/Factory.java  |  79 +++
 .../apache/flink/table/factories/FactoryUtil.java  | 570 +++++++++++++++++++++
 .../flink/table/factories/ScanFormatFactory.java   |  50 ++
 .../factories/SerializationFormatFactory.java      |  34 ++
 .../flink/table/factories/SinkFormatFactory.java   |  50 ++
 .../flink/table/factories/FactoryUtilTest.java     | 285 +++++++++++
 .../table/factories/TestDynamicTableFactory.java   | 257 ++++++++++
 .../flink/table/factories/TestFormatFactory.java   | 180 +++++++
 .../org.apache.flink.table.factories.Factory       |  17 +
 18 files changed, 1864 insertions(+)

diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index 3e83357..245a2be 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -43,6 +43,7 @@ class CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa
         return {
             'open',
             'close',
+            'getFactory',
             'getTableFactory',
             'getFunctionDefinitionFactory',
             'listPartitionsByFilter'}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index b4ff5f3..1e4c482 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 
@@ -49,11 +51,29 @@ import java.util.Optional;
 public interface Catalog {
 
 	/**
+	 * Returns a factory for creating instances from catalog objects.
+	 *
+	 * <p>This method enables bypassing the discovery process. Implementers can directly pass internal
+	 * catalog-specific objects to their own factory. For example, a custom {@link CatalogTable} can
+	 * be processed by a custom {@link DynamicTableFactory}.
+	 *
+	 * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement multiple
+	 * supported extension points. An {@code instanceof} check is performed by the caller that checks
+	 * whether a required factory is implemented; otherwise the discovery process is used.
+	 */
+	default Optional<Factory> getFactory() {
+		return Optional.empty();
+	}
+
+	/**
 	 * Get an optional {@link TableFactory} instance that's responsible for generating table-related
 	 * instances stored in this catalog, instances such as source/sink.
 	 *
 	 * @return an optional TableFactory instance
+	 * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses the
+	 *             new table sources and sinks defined in FLIP-95 and a slightly different discovery mechanism.
 	 */
+	@Deprecated
 	default Optional<TableFactory> getTableFactory() {
 		return Optional.empty();
 	}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
new file mode 100644
index 0000000..40c0115
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Base interface for connector formats.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface is an intermediate representation before constructing actual
+ * runtime implementation.
+ *
+ * <p>Formats can be distinguished along two dimensions:
+ * <ul>
+ *     <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or
+ *     some bulk interface).</li>
+ * </ul>
+ *
+ * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector.
+ *
+ * @see ScanFormat
+ * @see SinkFormat
+ */
+@PublicEvolving
+public interface Format {
+
+	/**
+	 * Returns the set of changes that a connector (and transitively the planner) can expect during
+	 * runtime.
+	 */
+	ChangelogMode getChangelogMode();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
new file mode 100644
index 0000000..529ea37
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link ScanTableSource}.
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormat<I> extends Format {
+
+	/**
+	 * Creates runtime implementation that is configured to produce data of the given data type.
+	 */
+	I createScanFormat(ScanTableSource.Context context, DataType producedDataType);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
new file mode 100644
index 0000000..0b67336
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link DynamicTableSink}.
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormat<I> extends Format {
+
+	/**
+	 * Creates runtime implementation that is configured to consume data of the given data type.
+	 */
+	I createSinkFormat(ScanTableSource.Context context, DataType consumedDataType);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
new file mode 100644
index 0000000..1901fc5
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link ScanFormat} for {@link DeserializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface DeserializationFormatFactory extends ScanFormatFactory<DeserializationSchema<RowData>> {
+  // interface is used for discovery but is already fully specified by the generics
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
new file mode 100644
index 0000000..0d4f8ba
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Base interface for configuring a dynamic table connector for an external storage system from catalog
+ * and session information.
+ *
+ * <p>Dynamic tables are the core concept of Flink's Table & SQL API for processing both bounded and
+ * unbounded data in a unified fashion.
+ *
+ * <p>Implement {@link DynamicTableSourceFactory} for constructing a {@link DynamicTableSource}.
+ *
+ * <p>Implement {@link DynamicTableSinkFactory} for constructing a {@link DynamicTableSink}.
+ *
+ * <p>The options {@link FactoryUtil#PROPERTY_VERSION} and {@link FactoryUtil#CONNECTOR} are implicitly
+ * added and must not be declared.
+ */
+@PublicEvolving
+public interface DynamicTableFactory extends Factory {
+
+	/**
+	 * Provides catalog and session information describing the dynamic table to be accessed.
+	 */
+	interface Context {
+
+		/**
+		 * Returns the identifier of the table in the {@link Catalog}.
+		 */
+		ObjectIdentifier getObjectIdentifier();
+
+		/**
+		 * Returns table information received from the {@link Catalog}.
+		 */
+		CatalogTable getCatalogTable();
+
+		/**
+		 * Gives read-only access to the configuration of the current session.
+		 */
+		ReadableConfig getConfiguration();
+
+		/**
+		 * Returns the class loader of the current session.
+		 *
+		 * <p>The class loader is in particular useful for discovering further (nested) factories.
+		 */
+		ClassLoader getClassLoader();
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
new file mode 100644
index 0000000..7f428fe
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a factory.
+ */
+@PublicEvolving
+public interface DynamicTableSinkFactory extends DynamicTableFactory {
+
+	/**
+	 * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context
+	 * information.
+	 *
+	 * <p>An implementation should perform validation and the discovery of further (nested) factories
+	 * in this method.
+	 */
+	DynamicTableSink createDynamicTableSink(Context context);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
new file mode 100644
index 0000000..3c0ecd1
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a factory.
+ */
+@PublicEvolving
+public interface DynamicTableSourceFactory extends DynamicTableFactory {
+
+	/**
+	 * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context
+	 * information.
+	 *
+	 * <p>An implementation should perform validation and the discovery of further (nested) factories
+	 * in this method.
+	 */
+	DynamicTableSource createDynamicTableSource(Context context);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
new file mode 100644
index 0000000..b669e76
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Set;
+
+/**
+ * Base interface for all kind of factories that create object instances from a list of key-value pairs
+ * in Flink's Table & SQL API.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link #factoryIdentifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI). Classes
+ * that implement this interface can be added to {@code META_INF/services/org.apache.flink.table.factories.Factory}
+ * in JAR files.
+ *
+ * <p>Every factory declares a set of required and optional options. This information will not be used
+ * during discovery but is helpful when generating documentation and performing validation. A factory may
+ * discover further (nested) factories, the options of the nested factories must not be declared in the
+ * sets of this factory.
+ *
+ * <p>It is the responsibility of each factory to perform validation before returning an instance.
+ *
+ * <p>For consistency, the following style for key names of {@link ConfigOption} is recommended:
+ * <ul>
+ *     <li>Try to <b>reuse</b> key names as much as possible. Use other factory implementations as an example.
+ *     <li>Key names should be declared in <b>lower case</b>. Use "-" instead of dots or camel case to split words.
+ *     <li>Key names should be <b>hierarchical</b> where appropriate. Think about how one would define such
+ *     a hierarchy in JSON or YAML file (e.g. {@code sink.bulk-flush.max-actions}).
+ *     <li>In case of a hierarchy, try not to use the higher level again in the key name (e.g. do {@code sink.partitioner}
+ *     instead of {@code sink.sink-partitioner}) to <b>keep the keys short</b>.
+ * </ul>
+ */
+@PublicEvolving
+public interface Factory {
+
+	/**
+	 * Returns a unique identifier among same factory interfaces.
+	 *
+	 * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code kafka}). If
+	 * multiple factories exist for different versions, a version should be appended using "-" (e.g. {@code kafka-0.10}).
+	 */
+	String factoryIdentifier();
+
+	/**
+	 * Returns a set of {@link ConfigOption} that an implementation of this factory requires in addition to
+	 * {@link #optionalOptions()}.
+	 *
+	 * <p>See the documentation of {@link Factory} for more information.
+	 */
+	Set<ConfigOption<?>> requiredOptions();
+
+	/**
+	 * Returns a set of {@link ConfigOption} that an implementation of this factory consumes in addition to
+	 * {@link #requiredOptions()}.
+	 *
+	 * <p>See the documentation of {@link Factory} for more information.
+	 */
+	Set<ConfigOption<?>> optionalOptions();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
new file mode 100644
index 0000000..fa42b8f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@PublicEvolving
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+
+		final List<String> missingRequiredOptions = factory.requiredOptions().stream()
+			.filter(option -> readOption(options, option) == null)
+			.map(ConfigOption::key)
+			.sorted()
+			.collect(Collectors.toList());
+
+		if (!missingRequiredOptions.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"One or more required options are missing.\n\n" +
+					"Missing required options are:\n\n" +
+					"%s",
+					String.join("\n", missingRequiredOptions)));
+		}
+
+		factory.optionalOptions()
+			.forEach(option -> readOption(options, option));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+			Class<T> factoryClass,
+			@Nullable Catalog catalog,
+			DefaultDynamicTableContext context) {
+		// catalog factory has highest precedence
+		if (catalog != null) {
+			final Factory factory = catalog.getFactory()
+				.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+				.orElse(null);
+			if (factory != null) {
+				return (T) factory;
+			}
+		}
+
+		// fallback to factory discovery
+		final String connectorOption = context.getCatalogTable()
+			.getOptions()
+			.get(CONNECTOR.key());
+		if (connectorOption == null) {
+			throw new ValidationException(
+				String.format(
+					"Table options do not contain an option key '%s' for discovering a connector.",
+					CONNECTOR.key()));
+		}
+		try {
+			return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
+		} catch (ValidationException e) {
+			throw new ValidationException(
+				String.format(
+					"Cannot discover a connector using option '%s'.",
+					stringifyOption(CONNECTOR.key(), connectorOption)),
+				e);
+		}
+	}
+
+	private static List<Factory> discoverFactories(ClassLoader classLoader) {
+		try {
+			final List<Factory> result = new LinkedList<>();
+			ServiceLoader
+				.load(Factory.class, classLoader)
+				.iterator()
+				.forEachRemaining(result::add);
+			return result;
+		} catch (ServiceConfigurationError e) {
+			LOG.error("Could not load service provider for factories.", e);
+			throw new TableException("Could not load service provider for factories.", e);
+		}
+	}
+
+	private static String stringifyOption(String key, String value) {
+		return String.format(
+			"'%s'='%s'",
+			EncodingUtils.escapeSingleQuotes(key),
+			EncodingUtils.escapeSingleQuotes(value));
+	}
+
+	private static Configuration asConfiguration(Map<String, String> options) {
+		final Configuration configuration = new Configuration();
+		options.forEach(configuration::setString);
+		return configuration;
+	}
+
+	private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+		try {
+			return options.get(option);
+		} catch (Throwable t) {
+			throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+	 */
+	public static class TableFactoryHelper {
+
+		private final DynamicTableFactory tableFactory;
+
+		private final DynamicTableFactory.Context context;
+
+		private final Configuration allOptions;
+
+		private final Set<String> consumedOptionKeys;
+
+		private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
+			this.tableFactory = tableFactory;
+			this.context = context;
+			this.allOptions = asConfiguration(context.getCatalogTable().getOptions());
+			this.consumedOptionKeys = new HashSet<>();
+			this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+			this.consumedOptionKeys.add(CONNECTOR.key());
+			this.consumedOptionKeys.addAll(
+				tableFactory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+			this.consumedOptionKeys.addAll(
+				tableFactory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required scan format '%s'.", formatOption.key())));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
+		 * identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+				.map(formatFactory -> {
+					try {
+						return formatFactory.createScanFormat(context, projectOptions(formatPrefix));
+					} catch (Throwable t) {
+						throw new ValidationException(
+							String.format(
+								"Error creating scan format '%s' in option space '%s'.",
+								formatFactory.factoryIdentifier(),
+								formatPrefix),
+							t);
+					}
+				});
+		}
+
+		/**
+		 * Discovers a {@link SinkFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalSinkFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required sink format '%s'.", formatOption.key())));
+		}
+
+		/**
+		 * Discovers a {@link SinkFormat} of the given type using the given option (if present) as factory
+		 * identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends SinkFormatFactory<I>> Optional<SinkFormat<I>> discoverOptionalSinkFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+				.map(formatFactory -> {
+					try {
+						return formatFactory.createSinkFormat(context, projectOptions(formatPrefix));
+					} catch (Throwable t) {
+						throw new ValidationException(
+							String.format(
+								"Error creating sink format '%s' in option space '%s'.",
+								formatFactory.factoryIdentifier(),
+								formatPrefix),
+							t);
+					}
+				});
+		}
+
+		/**
+		 * Validates the options of the {@link DynamicTableFactory}. It checks for unconsumed option
+		 * keys.
+		 */
+		public void validate() {
+			validateFactoryOptions(tableFactory, allOptions);
+			final Set<String> remainingOptionKeys = new HashSet<>(allOptions.keySet());
+			remainingOptionKeys.removeAll(consumedOptionKeys);
+			if (remainingOptionKeys.size() > 0) {
+				throw new ValidationException(
+					String.format(
+						"Unsupported options found for connector '%s'.\n\n" +
+						"Unsupported options:\n\n" +
+						"%s\n\n" +
+						"Supported options:\n\n" +
+						"%s",
+						tableFactory.factoryIdentifier(),
+						remainingOptionKeys.stream()
+							.sorted()
+							.collect(Collectors.joining("\n")),
+						consumedOptionKeys.stream()
+							.sorted()
+							.collect(Collectors.joining("\n"))));
+			}
+		}
+
+		/**
+		 * Returns all options of the table.
+		 */
+		public ReadableConfig getOptions() {
+			return allOptions;
+		}
+
+		// ----------------------------------------------------------------------------------------
+
+		private <F extends Factory> Optional<F> discoverOptionalFormatFactory(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			final String identifier = allOptions.get(formatOption);
+			if (identifier == null) {
+				return Optional.empty();
+			}
+			final F factory = discoverFactory(
+				context.getClassLoader(),
+				formatFactoryClass,
+				identifier);
+			// log all used options of other factories
+			consumedOptionKeys.addAll(
+				factory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.map(k -> formatPrefix + k)
+					.collect(Collectors.toSet()));
+			consumedOptionKeys.addAll(
+				factory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.map(k -> formatPrefix + k)
+					.collect(Collectors.toSet()));
+			return Optional.of(factory);
+		}
+
+		private ReadableConfig projectOptions(String formatPrefix) {
+			return new DelegatingConfiguration(
+				allOptions,
+				formatPrefix);
+		}
+	}
+
+	private static class DefaultDynamicTableContext implements DynamicTableFactory.Context {
+
+		private final ObjectIdentifier objectIdentifier;
+		private final CatalogTable catalogTable;
+		private final ReadableConfig configuration;
+		private final ClassLoader classLoader;
+
+		DefaultDynamicTableContext(
+				ObjectIdentifier objectIdentifier,
+				CatalogTable catalogTable,
+				ReadableConfig configuration,
+				ClassLoader classLoader) {
+			this.objectIdentifier = objectIdentifier;
+			this.catalogTable = catalogTable;
+			this.configuration = configuration;
+			this.classLoader = classLoader;
+		}
+
+		@Override
+		public ObjectIdentifier getObjectIdentifier() {
+			return objectIdentifier;
+		}
+
+		@Override
+		public CatalogTable getCatalogTable() {
+			return catalogTable;
+		}
+
+		@Override
+		public ReadableConfig getConfiguration() {
+			return configuration;
+		}
+
+		@Override
+		public ClassLoader getClassLoader() {
+			return classLoader;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private FactoryUtil() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
new file mode 100644
index 0000000..184c432
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link ScanFormat} for a {@link ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface helps in making such formats pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormatFactory<I> extends Factory {
+
+	/**
+	 * Creates a format from the given context and format options.
+	 *
+	 * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
+	 * to {@code format.ignore-errors}).
+	 */
+	ScanFormat<I> createScanFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
new file mode 100644
index 0000000..9b669df
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link SinkFormat} for {@link SerializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface SerializationFormatFactory extends SinkFormatFactory<SerializationSchema<RowData>> {
+  // interface is used for discovery but is already fully specified by the generics
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
new file mode 100644
index 0000000..4212ea6
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link SinkFormat} for a {@link ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface helps in making such formats pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormatFactory<I> extends Factory {
+
+	/**
+	 * Creates a format from the given context and format options.
+	 *
+	 * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
+	 * to {@code format.ignore-errors}).
+	 */
+	SinkFormat<I> createSinkFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
new file mode 100644
index 0000000..15f6412
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock;
+import org.apache.flink.table.factories.TestFormatFactory.ScanFormatMock;
+import org.apache.flink.table.factories.TestFormatFactory.SinkFormatMock;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FactoryUtil}.
+ */
+public class FactoryUtilTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testMissingConnector() {
+		expectError("Table options do not contain an option key 'connector' for discovering a connector.");
+		testError(options -> options.remove("connector"));
+	}
+
+	@Test
+	public void testInvalidConnector() {
+		expectError(
+			"Could not find any factory for identifier 'FAIL' that implements '" +
+				DynamicTableSourceFactory.class.getName() + "' in the classpath.\n\n" +
+			"Available factory identifiers are:\n\n" +
+			"test-connector");
+		testError(options -> options.put("connector", "FAIL"));
+	}
+
+	@Test
+	public void testMissingConnectorOption() {
+		expectError(
+			"One or more required options are missing.\n\n" +
+			"Missing required options are:\n\n" +
+			"target");
+		testError(options -> options.remove("target"));
+	}
+
+	@Test
+	public void testInvalidConnectorOption() {
+		expectError("Invalid value for option 'buffer-size'.");
+		testError(options -> options.put("buffer-size", "FAIL"));
+	}
+
+	@Test
+	public void testMissingFormat() {
+		expectError("Could not find required scan format 'value.format.kind'.");
+		testError(options -> options.remove("value.format.kind"));
+	}
+
+	@Test
+	public void testInvalidFormat() {
+		expectError(
+			"Could not find any factory for identifier 'FAIL' that implements '" +
+				DeserializationFormatFactory.class.getName() + "' in the classpath.\n\n" +
+			"Available factory identifiers are:\n\n" +
+			"test-format");
+		testError(options -> options.put("value.format.kind", "FAIL"));
+	}
+
+	@Test
+	public void testMissingFormatOption() {
+		expectError(
+			"Error creating scan format 'test-format' in option space 'key.format.'.");
+		expectError(
+			"One or more required options are missing.\n\n" +
+			"Missing required options are:\n\n" +
+			"delimiter");
+		testError(options -> options.remove("key.format.delimiter"));
+	}
+
+	@Test
+	public void testInvalidFormatOption() {
+		expectError("Invalid value for option 'fail-on-missing'.");
+		testError(options -> options.put("key.format.fail-on-missing", "FAIL"));
+	}
+
+	@Test
+	public void testUnconsumedOption() {
+		expectError(
+			"Unsupported options found for connector 'test-connector'.\n\n" +
+			"Unsupported options:\n\n" +
+			"this-is-also-not-consumed\n" +
+			"this-is-not-consumed\n\n" +
+			"Supported options:\n\n" +
+			"buffer-size\n" +
+			"connector\n" +
+			"key.format.delimiter\n" +
+			"key.format.fail-on-missing\n" +
+			"key.format.kind\n" +
+			"property-version\n" +
+			"target\n" +
+			"value.format.delimiter\n" +
+			"value.format.fail-on-missing\n" +
+			"value.format.kind");
+		testError(options -> {
+			options.put("this-is-not-consumed", "42");
+			options.put("this-is-also-not-consumed", "true");
+		});
+	}
+
+	@Test
+	public void testAllOptions() {
+		final Map<String, String> options = createAllOptions();
+		final DynamicTableSource actualSource = createTableSource(options);
+		final DynamicTableSource expectedSource = new DynamicTableSourceMock(
+			"MyTarget",
+			new ScanFormatMock(",", false),
+			new ScanFormatMock("|", true));
+		assertEquals(expectedSource, actualSource);
+		final DynamicTableSink actualSink = createTableSink(options);
+		final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+			"MyTarget",
+			1000L,
+			new SinkFormatMock(","),
+			new SinkFormatMock("|"));
+		assertEquals(expectedSink, actualSink);
+	}
+
+	@Test
+	public void testOptionalFormat() {
+		final Map<String, String> options = createAllOptions();
+		options.remove("key.format.kind");
+		options.remove("key.format.delimiter");
+		final DynamicTableSource actualSource = createTableSource(options);
+		final DynamicTableSource expectedSource = new DynamicTableSourceMock(
+			"MyTarget",
+			null,
+			new ScanFormatMock("|", true));
+		assertEquals(expectedSource, actualSource);
+		final DynamicTableSink actualSink = createTableSink(options);
+		final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+			"MyTarget",
+			1000L,
+			null,
+			new SinkFormatMock("|"));
+		assertEquals(expectedSink, actualSink);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void expectError(String message) {
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException(message)));
+	}
+
+	private static void testError(Consumer<Map<String, String>> optionModifier) {
+		final Map<String, String> options = createAllOptions();
+		optionModifier.accept(options);
+		createTableSource(options);
+	}
+
+	private static Map<String, String> createAllOptions() {
+		final Map<String, String> options = new HashMap<>();
+		// we use strings here to test realistic parsing
+		options.put("property-version", "1");
+		options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+		options.put("target", "MyTarget");
+		options.put("buffer-size", "1000");
+		options.put("key.format.kind", TestFormatFactory.IDENTIFIER);
+		options.put("key.format.delimiter", ",");
+		options.put("value.format.kind", TestFormatFactory.IDENTIFIER);
+		options.put("value.format.delimiter", "|");
+		options.put("value.format.fail-on-missing", "true");
+		return options;
+	}
+
+	private static DynamicTableSource createTableSource(Map<String, String> options) {
+		return FactoryUtil.createTableSource(
+			null,
+			ObjectIdentifier.of("cat", "db", "table"),
+			new CatalogTableMock(options),
+			new Configuration(),
+			FactoryUtilTest.class.getClassLoader());
+	}
+
+	private static DynamicTableSink createTableSink(Map<String, String> options) {
+		return FactoryUtil.createTableSink(
+			null,
+			ObjectIdentifier.of("cat", "db", "table"),
+			new CatalogTableMock(options),
+			new Configuration(),
+			FactoryUtilTest.class.getClassLoader());
+	}
+
+	private static class CatalogTableMock implements CatalogTable {
+
+		final Map<String, String> options;
+
+		CatalogTableMock(Map<String, String> options) {
+			this.options = options;
+		}
+
+		@Override
+		public boolean isPartitioned() {
+			return false;
+		}
+
+		@Override
+		public List<String> getPartitionKeys() {
+			return null;
+		}
+
+		@Override
+		public CatalogTable copy(Map<String, String> options) {
+			return null;
+		}
+
+		@Override
+		public Map<String, String> toProperties() {
+			return null;
+		}
+
+		@Override
+		public Map<String, String> getProperties() {
+			return options;
+		}
+
+		@Override
+		public TableSchema getSchema() {
+			return null;
+		}
+
+		@Override
+		public String getComment() {
+			return null;
+		}
+
+		@Override
+		public CatalogBaseTable copy() {
+			return null;
+		}
+
+		@Override
+		public Optional<String> getDescription() {
+			return Optional.empty();
+		}
+
+		@Override
+		public Optional<String> getDetailedDescription() {
+			return Optional.empty();
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
new file mode 100644
index 0000000..ff5a21f
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Test implementations for {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory}.
+ */
+public final class TestDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "test-connector";
+
+	public static final ConfigOption<String> TARGET = ConfigOptions
+		.key("target")
+		.stringType()
+		.noDefaultValue();
+
+	public static final ConfigOption<Long> BUFFER_SIZE = ConfigOptions
+		.key("buffer-size")
+		.longType()
+		.defaultValue(100L);
+
+	public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
+		.key("key.format.kind")
+		.stringType()
+		.noDefaultValue();
+
+	public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions
+		.key("value.format.kind")
+		.stringType()
+		.noDefaultValue();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final Optional<ScanFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalScanFormat(
+			DeserializationFormatFactory.class,
+			KEY_FORMAT,
+			FactoryUtil.KEY_FORMAT_PREFIX);
+		final ScanFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverScanFormat(
+			DeserializationFormatFactory.class,
+			VALUE_FORMAT,
+			FactoryUtil.VALUE_FORMAT_PREFIX);
+		helper.validate();
+
+		return new DynamicTableSourceMock(
+			helper.getOptions().get(TARGET),
+			keyFormat.orElse(null),
+			valueFormat);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final Optional<SinkFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalSinkFormat(
+			SerializationFormatFactory.class,
+			KEY_FORMAT,
+			FactoryUtil.KEY_FORMAT_PREFIX);
+		final SinkFormat<SerializationSchema<RowData>> valueFormat = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			VALUE_FORMAT,
+			FactoryUtil.VALUE_FORMAT_PREFIX);
+		helper.validate();
+
+		return new DynamicTableSinkMock(
+			helper.getOptions().get(TARGET),
+			helper.getOptions().get(BUFFER_SIZE),
+			keyFormat.orElse(null),
+			valueFormat);
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(TARGET);
+		options.add(VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(BUFFER_SIZE);
+		options.add(KEY_FORMAT);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table source
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link DynamicTableSource} for testing.
+	 */
+	public static class DynamicTableSourceMock implements ScanTableSource {
+
+		public final String target;
+		public final @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat;
+		public final ScanFormat<DeserializationSchema<RowData>> sourceValueFormat;
+
+		DynamicTableSourceMock(
+				String target,
+				@Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat,
+				ScanFormat<DeserializationSchema<RowData>> sourceValueFormat) {
+			this.target = target;
+			this.sourceKeyFormat = sourceKeyFormat;
+			this.sourceValueFormat = sourceValueFormat;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode() {
+			return null;
+		}
+
+		@Override
+		public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+			return null;
+		}
+
+		@Override
+		public DynamicTableSource copy() {
+			return null;
+		}
+
+		@Override
+		public String asSummaryString() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			DynamicTableSourceMock that = (DynamicTableSourceMock) o;
+			return target.equals(that.target) &&
+				Objects.equals(sourceKeyFormat, that.sourceKeyFormat) &&
+				sourceValueFormat.equals(that.sourceValueFormat);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(target, sourceKeyFormat, sourceValueFormat);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table sink
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link DynamicTableSink} for testing.
+	 */
+	public static class DynamicTableSinkMock implements DynamicTableSink {
+
+		public final String target;
+		public final Long bufferSize;
+		public final @Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat;
+		public final SinkFormat<SerializationSchema<RowData>> sinkValueFormat;
+
+		DynamicTableSinkMock(
+				String target,
+				Long bufferSize,
+				@Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat,
+				SinkFormat<SerializationSchema<RowData>> sinkValueFormat) {
+			this.target = target;
+			this.bufferSize = bufferSize;
+			this.sinkKeyFormat = sinkKeyFormat;
+			this.sinkValueFormat = sinkValueFormat;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+			return null;
+		}
+
+		@Override
+		public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+			return null;
+		}
+
+		@Override
+		public DynamicTableSink copy() {
+			return null;
+		}
+
+		@Override
+		public String asSummaryString() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			DynamicTableSinkMock that = (DynamicTableSinkMock) o;
+			return target.equals(that.target) &&
+				bufferSize.equals(that.bufferSize) &&
+				Objects.equals(sinkKeyFormat, that.sinkKeyFormat) &&
+				sinkValueFormat.equals(that.sinkValueFormat);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(target, bufferSize, sinkKeyFormat, sinkValueFormat);
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
new file mode 100644
index 0000000..c04bdb6
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Tests implementations for {@link DeserializationFormatFactory} and {@link SerializationFormatFactory}.
+ */
+public class TestFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
+
+	public static final String IDENTIFIER = "test-format";
+
+	public static final ConfigOption<String> DELIMITER = ConfigOptions
+		.key("delimiter")
+		.stringType()
+		.noDefaultValue();
+
+	public static final ConfigOption<Boolean> FAIL_ON_MISSING = ConfigOptions
+		.key("fail-on-missing")
+		.booleanType()
+		.defaultValue(false);
+
+	@Override
+	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+			DynamicTableFactory.Context context,
+			ReadableConfig formatConfig) {
+		FactoryUtil.validateFactoryOptions(this, formatConfig);
+		return new ScanFormatMock(formatConfig.get(DELIMITER), formatConfig.get(FAIL_ON_MISSING));
+	}
+
+	@Override
+	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+			DynamicTableFactory.Context context,
+			ReadableConfig formatConfig) {
+		FactoryUtil.validateFactoryOptions(this, formatConfig);
+		return new SinkFormatMock(formatConfig.get(DELIMITER));
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(DELIMITER);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FAIL_ON_MISSING);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table source format
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link ScanFormat} for testing.
+	 */
+	public static class ScanFormatMock implements ScanFormat<DeserializationSchema<RowData>> {
+
+		public final String delimiter;
+		public final Boolean failOnMissing;
+
+		ScanFormatMock(String delimiter, Boolean failOnMissing) {
+			this.delimiter = delimiter;
+			this.failOnMissing = failOnMissing;
+		}
+
+		@Override
+		public DeserializationSchema<RowData> createScanFormat(
+				ScanTableSource.Context context,
+				DataType producedDataType) {
+			return null;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ScanFormatMock that = (ScanFormatMock) o;
+			return delimiter.equals(that.delimiter) && failOnMissing.equals(that.failOnMissing);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(delimiter, failOnMissing);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table sink format
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link SinkFormat} for testing.
+	 */
+	public static class SinkFormatMock implements SinkFormat<SerializationSchema<RowData>> {
+
+		public final String delimiter;
+
+		SinkFormatMock(String delimiter) {
+			this.delimiter = delimiter;
+		}
+
+		@Override
+		public SerializationSchema<RowData> createSinkFormat(
+				ScanTableSource.Context context,
+				DataType consumeDataType) {
+			return null;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			SinkFormatMock that = (SinkFormatMock) o;
+			return delimiter.equals(that.delimiter);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(delimiter);
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..d31007a
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.TestDynamicTableFactory
+org.apache.flink.table.factories.TestFormatFactory