You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/05 18:48:51 UTC

[flink] branch master updated (ed40049 -> fef974d)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ed40049  [FLINK-15101][connector/common] Add the SourceCoordinator implementation
     new 5aec41d  [hotfix][table-common] Improve terminology around catalog table options
     new 5adbba8  [hotfix][table-common] Enable receiving isBounded in DynamicTableSink
     new fef974d  [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-python/pyflink/table/catalog.py              |  13 +
 .../table/tests/test_catalog_completeness.py       |   1 +
 .../org/apache/flink/table/catalog/Catalog.java    |  20 +
 .../flink/table/catalog/CatalogBaseTable.java      |  18 +-
 .../apache/flink/table/catalog/CatalogTable.java   |  14 +-
 .../flink/table/connector/format/Format.java       |  55 ++
 .../flink/table/connector/format/ScanFormat.java   |  37 ++
 .../flink/table/connector/format/SinkFormat.java   |  38 ++
 .../table/connector/sink/DynamicTableSink.java     |   7 +
 .../factories/DeserializationFormatFactory.java    |  34 ++
 .../flink/table/factories/DynamicTableFactory.java |  73 +++
 .../table/factories/DynamicTableSinkFactory.java   |  42 ++
 .../table/factories/DynamicTableSourceFactory.java |  42 ++
 .../org/apache/flink/table/factories/Factory.java  |  79 +++
 .../apache/flink/table/factories/FactoryUtil.java  | 570 +++++++++++++++++++++
 .../flink/table/factories/ScanFormatFactory.java   |  50 ++
 .../factories/SerializationFormatFactory.java      |  34 ++
 .../flink/table/factories/SinkFormatFactory.java   |  50 ++
 .../flink/table/factories/FactoryUtilTest.java     | 285 +++++++++++
 .../table/factories/TestDynamicTableFactory.java   | 257 ++++++++++
 .../flink/table/factories/TestFormatFactory.java   | 180 +++++++
 .../org.apache.flink.table.factories.Factory       |  17 +
 22 files changed, 1906 insertions(+), 10 deletions(-)
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
 create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
 create mode 100644 flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
 create mode 100644 flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory


[flink] 03/03: [FLINK-16997][table-common] Add new factory interfaces and discovery utilities

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fef974d7df1a9dabd62f3288c086185d69298bc1
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Apr 30 11:03:47 2020 +0200

    [FLINK-16997][table-common] Add new factory interfaces and discovery utilities
    
    Implements the new factory interfaces mentioned in FLIP-95. Adds new factory utilities
    that can be used for FLIP-122 and future factories.
    
    It adds TestDynamicTableFactory and TestFormatFactory for reference implementations
    of new factories.
    
    This closes #11959.
---
 .../table/tests/test_catalog_completeness.py       |   1 +
 .../org/apache/flink/table/catalog/Catalog.java    |  20 +
 .../flink/table/connector/format/Format.java       |  55 ++
 .../flink/table/connector/format/ScanFormat.java   |  37 ++
 .../flink/table/connector/format/SinkFormat.java   |  38 ++
 .../factories/DeserializationFormatFactory.java    |  34 ++
 .../flink/table/factories/DynamicTableFactory.java |  73 +++
 .../table/factories/DynamicTableSinkFactory.java   |  42 ++
 .../table/factories/DynamicTableSourceFactory.java |  42 ++
 .../org/apache/flink/table/factories/Factory.java  |  79 +++
 .../apache/flink/table/factories/FactoryUtil.java  | 570 +++++++++++++++++++++
 .../flink/table/factories/ScanFormatFactory.java   |  50 ++
 .../factories/SerializationFormatFactory.java      |  34 ++
 .../flink/table/factories/SinkFormatFactory.java   |  50 ++
 .../flink/table/factories/FactoryUtilTest.java     | 285 +++++++++++
 .../table/factories/TestDynamicTableFactory.java   | 257 ++++++++++
 .../flink/table/factories/TestFormatFactory.java   | 180 +++++++
 .../org.apache.flink.table.factories.Factory       |  17 +
 18 files changed, 1864 insertions(+)

diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index 3e83357..245a2be 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -43,6 +43,7 @@ class CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa
         return {
             'open',
             'close',
+            'getFactory',
             'getTableFactory',
             'getFunctionDefinitionFactory',
             'listPartitionsByFilter'}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index b4ff5f3..1e4c482 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 
@@ -49,11 +51,29 @@ import java.util.Optional;
 public interface Catalog {
 
 	/**
+	 * Returns a factory for creating instances from catalog objects.
+	 *
+	 * <p>This method enables bypassing the discovery process. Implementers can directly pass internal
+	 * catalog-specific objects to their own factory. For example, a custom {@link CatalogTable} can
+	 * be processed by a custom {@link DynamicTableFactory}.
+	 *
+	 * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement multiple
+	 * supported extension points. An {@code instanceof} check is performed by the caller that checks
+	 * whether a required factory is implemented; otherwise the discovery process is used.
+	 */
+	default Optional<Factory> getFactory() {
+		return Optional.empty();
+	}
+
+	/**
 	 * Get an optional {@link TableFactory} instance that's responsible for generating table-related
 	 * instances stored in this catalog, instances such as source/sink.
 	 *
 	 * @return an optional TableFactory instance
+	 * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses the
+	 *             new table sources and sinks defined in FLIP-95 and a slightly different discovery mechanism.
 	 */
+	@Deprecated
 	default Optional<TableFactory> getTableFactory() {
 		return Optional.empty();
 	}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
new file mode 100644
index 0000000..40c0115
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Base interface for connector formats.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface is an intermediate representation before constructing actual
+ * runtime implementation.
+ *
+ * <p>Formats can be distinguished along two dimensions:
+ * <ul>
+ *     <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or
+ *     some bulk interface).</li>
+ * </ul>
+ *
+ * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector.
+ *
+ * @see ScanFormat
+ * @see SinkFormat
+ */
+@PublicEvolving
+public interface Format {
+
+	/**
+	 * Returns the set of changes that a connector (and transitively the planner) can expect during
+	 * runtime.
+	 */
+	ChangelogMode getChangelogMode();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
new file mode 100644
index 0000000..529ea37
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link ScanTableSource}.
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormat<I> extends Format {
+
+	/**
+	 * Creates runtime implementation that is configured to produce data of the given data type.
+	 */
+	I createScanFormat(ScanTableSource.Context context, DataType producedDataType);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
new file mode 100644
index 0000000..0b67336
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link DynamicTableSink}.
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormat<I> extends Format {
+
+	/**
+	 * Creates runtime implementation that is configured to consume data of the given data type.
+	 */
+	I createSinkFormat(ScanTableSource.Context context, DataType consumedDataType);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
new file mode 100644
index 0000000..1901fc5
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link ScanFormat} for {@link DeserializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface DeserializationFormatFactory extends ScanFormatFactory<DeserializationSchema<RowData>> {
+  // interface is used for discovery but is already fully specified by the generics
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
new file mode 100644
index 0000000..0d4f8ba
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Base interface for configuring a dynamic table connector for an external storage system from catalog
+ * and session information.
+ *
+ * <p>Dynamic tables are the core concept of Flink's Table & SQL API for processing both bounded and
+ * unbounded data in a unified fashion.
+ *
+ * <p>Implement {@link DynamicTableSourceFactory} for constructing a {@link DynamicTableSource}.
+ *
+ * <p>Implement {@link DynamicTableSinkFactory} for constructing a {@link DynamicTableSink}.
+ *
+ * <p>The options {@link FactoryUtil#PROPERTY_VERSION} and {@link FactoryUtil#CONNECTOR} are implicitly
+ * added and must not be declared.
+ */
+@PublicEvolving
+public interface DynamicTableFactory extends Factory {
+
+	/**
+	 * Provides catalog and session information describing the dynamic table to be accessed.
+	 */
+	interface Context {
+
+		/**
+		 * Returns the identifier of the table in the {@link Catalog}.
+		 */
+		ObjectIdentifier getObjectIdentifier();
+
+		/**
+		 * Returns table information received from the {@link Catalog}.
+		 */
+		CatalogTable getCatalogTable();
+
+		/**
+		 * Gives read-only access to the configuration of the current session.
+		 */
+		ReadableConfig getConfiguration();
+
+		/**
+		 * Returns the class loader of the current session.
+		 *
+		 * <p>The class loader is in particular useful for discovering further (nested) factories.
+		 */
+		ClassLoader getClassLoader();
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
new file mode 100644
index 0000000..7f428fe
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a factory.
+ */
+@PublicEvolving
+public interface DynamicTableSinkFactory extends DynamicTableFactory {
+
+	/**
+	 * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context
+	 * information.
+	 *
+	 * <p>An implementation should perform validation and the discovery of further (nested) factories
+	 * in this method.
+	 */
+	DynamicTableSink createDynamicTableSink(Context context);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
new file mode 100644
index 0000000..3c0ecd1
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a factory.
+ */
+@PublicEvolving
+public interface DynamicTableSourceFactory extends DynamicTableFactory {
+
+	/**
+	 * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context
+	 * information.
+	 *
+	 * <p>An implementation should perform validation and the discovery of further (nested) factories
+	 * in this method.
+	 */
+	DynamicTableSource createDynamicTableSource(Context context);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
new file mode 100644
index 0000000..b669e76
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Set;
+
+/**
+ * Base interface for all kind of factories that create object instances from a list of key-value pairs
+ * in Flink's Table & SQL API.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link #factoryIdentifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI). Classes
+ * that implement this interface can be added to {@code META_INF/services/org.apache.flink.table.factories.Factory}
+ * in JAR files.
+ *
+ * <p>Every factory declares a set of required and optional options. This information will not be used
+ * during discovery but is helpful when generating documentation and performing validation. A factory may
+ * discover further (nested) factories, the options of the nested factories must not be declared in the
+ * sets of this factory.
+ *
+ * <p>It is the responsibility of each factory to perform validation before returning an instance.
+ *
+ * <p>For consistency, the following style for key names of {@link ConfigOption} is recommended:
+ * <ul>
+ *     <li>Try to <b>reuse</b> key names as much as possible. Use other factory implementations as an example.
+ *     <li>Key names should be declared in <b>lower case</b>. Use "-" instead of dots or camel case to split words.
+ *     <li>Key names should be <b>hierarchical</b> where appropriate. Think about how one would define such
+ *     a hierarchy in JSON or YAML file (e.g. {@code sink.bulk-flush.max-actions}).
+ *     <li>In case of a hierarchy, try not to use the higher level again in the key name (e.g. do {@code sink.partitioner}
+ *     instead of {@code sink.sink-partitioner}) to <b>keep the keys short</b>.
+ * </ul>
+ */
+@PublicEvolving
+public interface Factory {
+
+	/**
+	 * Returns a unique identifier among same factory interfaces.
+	 *
+	 * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code kafka}). If
+	 * multiple factories exist for different versions, a version should be appended using "-" (e.g. {@code kafka-0.10}).
+	 */
+	String factoryIdentifier();
+
+	/**
+	 * Returns a set of {@link ConfigOption} that an implementation of this factory requires in addition to
+	 * {@link #optionalOptions()}.
+	 *
+	 * <p>See the documentation of {@link Factory} for more information.
+	 */
+	Set<ConfigOption<?>> requiredOptions();
+
+	/**
+	 * Returns a set of {@link ConfigOption} that an implementation of this factory consumes in addition to
+	 * {@link #requiredOptions()}.
+	 *
+	 * <p>See the documentation of {@link Factory} for more information.
+	 */
+	Set<ConfigOption<?>> optionalOptions();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
new file mode 100644
index 0000000..fa42b8f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@PublicEvolving
+public final class FactoryUtil {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+	public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+		.intType()
+		.defaultValue(1)
+		.withDescription(
+			"Version of the overall property design. This option is meant for future backwards compatibility.");
+
+	public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+		.stringType()
+		.noDefaultValue()
+		.withDescription(
+			"Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+			"an external system. Its value is used during table source and table sink discovery.");
+
+	public static final String FORMAT_PREFIX = "format.";
+
+	public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+	public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+	/**
+	 * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSource createTableSource(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSourceFactory factory = getDynamicTableFactory(
+				DynamicTableSourceFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSource(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a source for reading table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+	 *
+	 * <p>It considers {@link Catalog#getFactory()} if provided.
+	 */
+	public static DynamicTableSink createTableSink(
+			@Nullable Catalog catalog,
+			ObjectIdentifier objectIdentifier,
+			CatalogTable catalogTable,
+			ReadableConfig configuration,
+			ClassLoader classLoader) {
+		final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+			objectIdentifier,
+			catalogTable,
+			configuration,
+			classLoader);
+		try {
+			final DynamicTableSinkFactory factory = getDynamicTableFactory(
+				DynamicTableSinkFactory.class,
+				catalog,
+				context);
+			return factory.createDynamicTableSink(context);
+		} catch (Throwable t) {
+			throw new ValidationException(
+				String.format(
+					"Unable to create a sink for writing table '%s'.\n\n" +
+					"Table options are:\n\n" +
+					"%s",
+					objectIdentifier.asSummaryString(),
+					catalogTable.getOptions()
+						.entrySet()
+						.stream()
+						.map(e -> stringifyOption(e.getKey(), e.getValue()))
+						.sorted()
+						.collect(Collectors.joining("\n"))),
+				t);
+		}
+	}
+
+	/**
+	 * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * <p>The following example sketches the usage:
+	 * <pre>{@code
+	 * // in createDynamicTableSource()
+	 * helper = FactoryUtil.createTableFactoryHelper(this, context);
+	 * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+	 * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+	 * helper.validate();
+	 * ... // construct connector with discovered formats
+	 * }</pre>
+	 *
+	 * <p>Note: This utility checks for left-over options in the final step.
+	 */
+	public static TableFactoryHelper createTableFactoryHelper(
+			DynamicTableFactory factory,
+			DynamicTableFactory.Context context) {
+		return new TableFactoryHelper(factory, context);
+	}
+
+	/**
+	 * Discovers a factory using the given factory base class and identifier.
+	 *
+	 * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+	 * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+	 * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+	 * are not applicable.
+	 */
+	@SuppressWarnings("unchecked")
+	public static <T extends Factory> T discoverFactory(
+			ClassLoader classLoader,
+			Class<T> factoryClass,
+			String factoryIdentifier) {
+		final List<Factory> factories = discoverFactories(classLoader);
+
+		final List<Factory> foundFactories = factories.stream()
+			.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+			.collect(Collectors.toList());
+
+		if (foundFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factories that implement '%s' in the classpath.",
+					factoryClass.getName()));
+		}
+
+		final List<Factory> matchingFactories = foundFactories.stream()
+			.filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+			.collect(Collectors.toList());
+
+		if (matchingFactories.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+					"Available factory identifiers are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(Factory::factoryIdentifier)
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+		if (matchingFactories.size() > 1) {
+			throw new ValidationException(
+				String.format(
+					"Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+					"Ambiguous factory classes are:\n\n" +
+					"%s",
+					factoryIdentifier,
+					factoryClass.getName(),
+					foundFactories.stream()
+						.map(f -> factories.getClass().getName())
+						.sorted()
+						.collect(Collectors.joining("\n"))));
+		}
+
+		return (T) matchingFactories.get(0);
+	}
+
+	/**
+	 * Validates the required and optional {@link ConfigOption}s of a factory.
+	 *
+	 * <p>Note: It does not check for left-over options.
+	 */
+	public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+		// currently Flink's options have no validation feature which is why we access them eagerly
+		// to provoke a parsing error
+
+		final List<String> missingRequiredOptions = factory.requiredOptions().stream()
+			.filter(option -> readOption(options, option) == null)
+			.map(ConfigOption::key)
+			.sorted()
+			.collect(Collectors.toList());
+
+		if (!missingRequiredOptions.isEmpty()) {
+			throw new ValidationException(
+				String.format(
+					"One or more required options are missing.\n\n" +
+					"Missing required options are:\n\n" +
+					"%s",
+					String.join("\n", missingRequiredOptions)));
+		}
+
+		factory.optionalOptions()
+			.forEach(option -> readOption(options, option));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper methods
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+			Class<T> factoryClass,
+			@Nullable Catalog catalog,
+			DefaultDynamicTableContext context) {
+		// catalog factory has highest precedence
+		if (catalog != null) {
+			final Factory factory = catalog.getFactory()
+				.filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+				.orElse(null);
+			if (factory != null) {
+				return (T) factory;
+			}
+		}
+
+		// fallback to factory discovery
+		final String connectorOption = context.getCatalogTable()
+			.getOptions()
+			.get(CONNECTOR.key());
+		if (connectorOption == null) {
+			throw new ValidationException(
+				String.format(
+					"Table options do not contain an option key '%s' for discovering a connector.",
+					CONNECTOR.key()));
+		}
+		try {
+			return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
+		} catch (ValidationException e) {
+			throw new ValidationException(
+				String.format(
+					"Cannot discover a connector using option '%s'.",
+					stringifyOption(CONNECTOR.key(), connectorOption)),
+				e);
+		}
+	}
+
+	private static List<Factory> discoverFactories(ClassLoader classLoader) {
+		try {
+			final List<Factory> result = new LinkedList<>();
+			ServiceLoader
+				.load(Factory.class, classLoader)
+				.iterator()
+				.forEachRemaining(result::add);
+			return result;
+		} catch (ServiceConfigurationError e) {
+			LOG.error("Could not load service provider for factories.", e);
+			throw new TableException("Could not load service provider for factories.", e);
+		}
+	}
+
+	private static String stringifyOption(String key, String value) {
+		return String.format(
+			"'%s'='%s'",
+			EncodingUtils.escapeSingleQuotes(key),
+			EncodingUtils.escapeSingleQuotes(value));
+	}
+
+	private static Configuration asConfiguration(Map<String, String> options) {
+		final Configuration configuration = new Configuration();
+		options.forEach(configuration::setString);
+		return configuration;
+	}
+
+	private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+		try {
+			return options.get(option);
+		} catch (Throwable t) {
+			throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Helper classes
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}.
+	 *
+	 * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+	 */
+	public static class TableFactoryHelper {
+
+		private final DynamicTableFactory tableFactory;
+
+		private final DynamicTableFactory.Context context;
+
+		private final Configuration allOptions;
+
+		private final Set<String> consumedOptionKeys;
+
+		private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
+			this.tableFactory = tableFactory;
+			this.context = context;
+			this.allOptions = asConfiguration(context.getCatalogTable().getOptions());
+			this.consumedOptionKeys = new HashSet<>();
+			this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+			this.consumedOptionKeys.add(CONNECTOR.key());
+			this.consumedOptionKeys.addAll(
+				tableFactory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+			this.consumedOptionKeys.addAll(
+				tableFactory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.collect(Collectors.toSet()));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required scan format '%s'.", formatOption.key())));
+		}
+
+		/**
+		 * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
+		 * identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+				.map(formatFactory -> {
+					try {
+						return formatFactory.createScanFormat(context, projectOptions(formatPrefix));
+					} catch (Throwable t) {
+						throw new ValidationException(
+							String.format(
+								"Error creating scan format '%s' in option space '%s'.",
+								formatFactory.factoryIdentifier(),
+								formatPrefix),
+							t);
+					}
+				});
+		}
+
+		/**
+		 * Discovers a {@link SinkFormat} of the given type using the given option as factory identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalSinkFormat(formatFactoryClass, formatOption, formatPrefix)
+				.orElseThrow(() ->
+					new ValidationException(
+						String.format("Could not find required sink format '%s'.", formatOption.key())));
+		}
+
+		/**
+		 * Discovers a {@link SinkFormat} of the given type using the given option (if present) as factory
+		 * identifier.
+		 *
+		 * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+		 */
+		public <I, F extends SinkFormatFactory<I>> Optional<SinkFormat<I>> discoverOptionalSinkFormat(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+				.map(formatFactory -> {
+					try {
+						return formatFactory.createSinkFormat(context, projectOptions(formatPrefix));
+					} catch (Throwable t) {
+						throw new ValidationException(
+							String.format(
+								"Error creating sink format '%s' in option space '%s'.",
+								formatFactory.factoryIdentifier(),
+								formatPrefix),
+							t);
+					}
+				});
+		}
+
+		/**
+		 * Validates the options of the {@link DynamicTableFactory}. It checks for unconsumed option
+		 * keys.
+		 */
+		public void validate() {
+			validateFactoryOptions(tableFactory, allOptions);
+			final Set<String> remainingOptionKeys = new HashSet<>(allOptions.keySet());
+			remainingOptionKeys.removeAll(consumedOptionKeys);
+			if (remainingOptionKeys.size() > 0) {
+				throw new ValidationException(
+					String.format(
+						"Unsupported options found for connector '%s'.\n\n" +
+						"Unsupported options:\n\n" +
+						"%s\n\n" +
+						"Supported options:\n\n" +
+						"%s",
+						tableFactory.factoryIdentifier(),
+						remainingOptionKeys.stream()
+							.sorted()
+							.collect(Collectors.joining("\n")),
+						consumedOptionKeys.stream()
+							.sorted()
+							.collect(Collectors.joining("\n"))));
+			}
+		}
+
+		/**
+		 * Returns all options of the table.
+		 */
+		public ReadableConfig getOptions() {
+			return allOptions;
+		}
+
+		// ----------------------------------------------------------------------------------------
+
+		private <F extends Factory> Optional<F> discoverOptionalFormatFactory(
+				Class<F> formatFactoryClass,
+				ConfigOption<String> formatOption,
+				String formatPrefix) {
+			final String identifier = allOptions.get(formatOption);
+			if (identifier == null) {
+				return Optional.empty();
+			}
+			final F factory = discoverFactory(
+				context.getClassLoader(),
+				formatFactoryClass,
+				identifier);
+			// log all used options of other factories
+			consumedOptionKeys.addAll(
+				factory.requiredOptions().stream()
+					.map(ConfigOption::key)
+					.map(k -> formatPrefix + k)
+					.collect(Collectors.toSet()));
+			consumedOptionKeys.addAll(
+				factory.optionalOptions().stream()
+					.map(ConfigOption::key)
+					.map(k -> formatPrefix + k)
+					.collect(Collectors.toSet()));
+			return Optional.of(factory);
+		}
+
+		private ReadableConfig projectOptions(String formatPrefix) {
+			return new DelegatingConfiguration(
+				allOptions,
+				formatPrefix);
+		}
+	}
+
+	private static class DefaultDynamicTableContext implements DynamicTableFactory.Context {
+
+		private final ObjectIdentifier objectIdentifier;
+		private final CatalogTable catalogTable;
+		private final ReadableConfig configuration;
+		private final ClassLoader classLoader;
+
+		DefaultDynamicTableContext(
+				ObjectIdentifier objectIdentifier,
+				CatalogTable catalogTable,
+				ReadableConfig configuration,
+				ClassLoader classLoader) {
+			this.objectIdentifier = objectIdentifier;
+			this.catalogTable = catalogTable;
+			this.configuration = configuration;
+			this.classLoader = classLoader;
+		}
+
+		@Override
+		public ObjectIdentifier getObjectIdentifier() {
+			return objectIdentifier;
+		}
+
+		@Override
+		public CatalogTable getCatalogTable() {
+			return catalogTable;
+		}
+
+		@Override
+		public ReadableConfig getConfiguration() {
+			return configuration;
+		}
+
+		@Override
+		public ClassLoader getClassLoader() {
+			return classLoader;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private FactoryUtil() {
+		// no instantiation
+	}
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
new file mode 100644
index 0000000..184c432
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link ScanFormat} for a {@link ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface helps in making such formats pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormatFactory<I> extends Factory {
+
+	/**
+	 * Creates a format from the given context and format options.
+	 *
+	 * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
+	 * to {@code format.ignore-errors}).
+	 */
+	ScanFormat<I> createScanFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
new file mode 100644
index 0000000..9b669df
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link SinkFormat} for {@link SerializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface SerializationFormatFactory extends SinkFormatFactory<SerializationSchema<RowData>> {
+  // interface is used for discovery but is already fully specified by the generics
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
new file mode 100644
index 0000000..4212ea6
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link SinkFormat} for a {@link ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface helps in making such formats pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormatFactory<I> extends Factory {
+
+	/**
+	 * Creates a format from the given context and format options.
+	 *
+	 * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
+	 * to {@code format.ignore-errors}).
+	 */
+	SinkFormat<I> createSinkFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
new file mode 100644
index 0000000..15f6412
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock;
+import org.apache.flink.table.factories.TestFormatFactory.ScanFormatMock;
+import org.apache.flink.table.factories.TestFormatFactory.SinkFormatMock;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FactoryUtil}.
+ */
+public class FactoryUtilTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testMissingConnector() {
+		expectError("Table options do not contain an option key 'connector' for discovering a connector.");
+		testError(options -> options.remove("connector"));
+	}
+
+	@Test
+	public void testInvalidConnector() {
+		expectError(
+			"Could not find any factory for identifier 'FAIL' that implements '" +
+				DynamicTableSourceFactory.class.getName() + "' in the classpath.\n\n" +
+			"Available factory identifiers are:\n\n" +
+			"test-connector");
+		testError(options -> options.put("connector", "FAIL"));
+	}
+
+	@Test
+	public void testMissingConnectorOption() {
+		expectError(
+			"One or more required options are missing.\n\n" +
+			"Missing required options are:\n\n" +
+			"target");
+		testError(options -> options.remove("target"));
+	}
+
+	@Test
+	public void testInvalidConnectorOption() {
+		expectError("Invalid value for option 'buffer-size'.");
+		testError(options -> options.put("buffer-size", "FAIL"));
+	}
+
+	@Test
+	public void testMissingFormat() {
+		expectError("Could not find required scan format 'value.format.kind'.");
+		testError(options -> options.remove("value.format.kind"));
+	}
+
+	@Test
+	public void testInvalidFormat() {
+		expectError(
+			"Could not find any factory for identifier 'FAIL' that implements '" +
+				DeserializationFormatFactory.class.getName() + "' in the classpath.\n\n" +
+			"Available factory identifiers are:\n\n" +
+			"test-format");
+		testError(options -> options.put("value.format.kind", "FAIL"));
+	}
+
+	@Test
+	public void testMissingFormatOption() {
+		expectError(
+			"Error creating scan format 'test-format' in option space 'key.format.'.");
+		expectError(
+			"One or more required options are missing.\n\n" +
+			"Missing required options are:\n\n" +
+			"delimiter");
+		testError(options -> options.remove("key.format.delimiter"));
+	}
+
+	@Test
+	public void testInvalidFormatOption() {
+		expectError("Invalid value for option 'fail-on-missing'.");
+		testError(options -> options.put("key.format.fail-on-missing", "FAIL"));
+	}
+
+	@Test
+	public void testUnconsumedOption() {
+		expectError(
+			"Unsupported options found for connector 'test-connector'.\n\n" +
+			"Unsupported options:\n\n" +
+			"this-is-also-not-consumed\n" +
+			"this-is-not-consumed\n\n" +
+			"Supported options:\n\n" +
+			"buffer-size\n" +
+			"connector\n" +
+			"key.format.delimiter\n" +
+			"key.format.fail-on-missing\n" +
+			"key.format.kind\n" +
+			"property-version\n" +
+			"target\n" +
+			"value.format.delimiter\n" +
+			"value.format.fail-on-missing\n" +
+			"value.format.kind");
+		testError(options -> {
+			options.put("this-is-not-consumed", "42");
+			options.put("this-is-also-not-consumed", "true");
+		});
+	}
+
+	@Test
+	public void testAllOptions() {
+		final Map<String, String> options = createAllOptions();
+		final DynamicTableSource actualSource = createTableSource(options);
+		final DynamicTableSource expectedSource = new DynamicTableSourceMock(
+			"MyTarget",
+			new ScanFormatMock(",", false),
+			new ScanFormatMock("|", true));
+		assertEquals(expectedSource, actualSource);
+		final DynamicTableSink actualSink = createTableSink(options);
+		final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+			"MyTarget",
+			1000L,
+			new SinkFormatMock(","),
+			new SinkFormatMock("|"));
+		assertEquals(expectedSink, actualSink);
+	}
+
+	@Test
+	public void testOptionalFormat() {
+		final Map<String, String> options = createAllOptions();
+		options.remove("key.format.kind");
+		options.remove("key.format.delimiter");
+		final DynamicTableSource actualSource = createTableSource(options);
+		final DynamicTableSource expectedSource = new DynamicTableSourceMock(
+			"MyTarget",
+			null,
+			new ScanFormatMock("|", true));
+		assertEquals(expectedSource, actualSource);
+		final DynamicTableSink actualSink = createTableSink(options);
+		final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+			"MyTarget",
+			1000L,
+			null,
+			new SinkFormatMock("|"));
+		assertEquals(expectedSink, actualSink);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private void expectError(String message) {
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException(message)));
+	}
+
+	private static void testError(Consumer<Map<String, String>> optionModifier) {
+		final Map<String, String> options = createAllOptions();
+		optionModifier.accept(options);
+		createTableSource(options);
+	}
+
+	private static Map<String, String> createAllOptions() {
+		final Map<String, String> options = new HashMap<>();
+		// we use strings here to test realistic parsing
+		options.put("property-version", "1");
+		options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+		options.put("target", "MyTarget");
+		options.put("buffer-size", "1000");
+		options.put("key.format.kind", TestFormatFactory.IDENTIFIER);
+		options.put("key.format.delimiter", ",");
+		options.put("value.format.kind", TestFormatFactory.IDENTIFIER);
+		options.put("value.format.delimiter", "|");
+		options.put("value.format.fail-on-missing", "true");
+		return options;
+	}
+
+	private static DynamicTableSource createTableSource(Map<String, String> options) {
+		return FactoryUtil.createTableSource(
+			null,
+			ObjectIdentifier.of("cat", "db", "table"),
+			new CatalogTableMock(options),
+			new Configuration(),
+			FactoryUtilTest.class.getClassLoader());
+	}
+
+	private static DynamicTableSink createTableSink(Map<String, String> options) {
+		return FactoryUtil.createTableSink(
+			null,
+			ObjectIdentifier.of("cat", "db", "table"),
+			new CatalogTableMock(options),
+			new Configuration(),
+			FactoryUtilTest.class.getClassLoader());
+	}
+
+	private static class CatalogTableMock implements CatalogTable {
+
+		final Map<String, String> options;
+
+		CatalogTableMock(Map<String, String> options) {
+			this.options = options;
+		}
+
+		@Override
+		public boolean isPartitioned() {
+			return false;
+		}
+
+		@Override
+		public List<String> getPartitionKeys() {
+			return null;
+		}
+
+		@Override
+		public CatalogTable copy(Map<String, String> options) {
+			return null;
+		}
+
+		@Override
+		public Map<String, String> toProperties() {
+			return null;
+		}
+
+		@Override
+		public Map<String, String> getProperties() {
+			return options;
+		}
+
+		@Override
+		public TableSchema getSchema() {
+			return null;
+		}
+
+		@Override
+		public String getComment() {
+			return null;
+		}
+
+		@Override
+		public CatalogBaseTable copy() {
+			return null;
+		}
+
+		@Override
+		public Optional<String> getDescription() {
+			return Optional.empty();
+		}
+
+		@Override
+		public Optional<String> getDetailedDescription() {
+			return Optional.empty();
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
new file mode 100644
index 0000000..ff5a21f
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Test implementations for {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory}.
+ */
+public final class TestDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "test-connector";
+
+	public static final ConfigOption<String> TARGET = ConfigOptions
+		.key("target")
+		.stringType()
+		.noDefaultValue();
+
+	public static final ConfigOption<Long> BUFFER_SIZE = ConfigOptions
+		.key("buffer-size")
+		.longType()
+		.defaultValue(100L);
+
+	public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
+		.key("key.format.kind")
+		.stringType()
+		.noDefaultValue();
+
+	public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions
+		.key("value.format.kind")
+		.stringType()
+		.noDefaultValue();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final Optional<ScanFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalScanFormat(
+			DeserializationFormatFactory.class,
+			KEY_FORMAT,
+			FactoryUtil.KEY_FORMAT_PREFIX);
+		final ScanFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverScanFormat(
+			DeserializationFormatFactory.class,
+			VALUE_FORMAT,
+			FactoryUtil.VALUE_FORMAT_PREFIX);
+		helper.validate();
+
+		return new DynamicTableSourceMock(
+			helper.getOptions().get(TARGET),
+			keyFormat.orElse(null),
+			valueFormat);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		final Optional<SinkFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalSinkFormat(
+			SerializationFormatFactory.class,
+			KEY_FORMAT,
+			FactoryUtil.KEY_FORMAT_PREFIX);
+		final SinkFormat<SerializationSchema<RowData>> valueFormat = helper.discoverSinkFormat(
+			SerializationFormatFactory.class,
+			VALUE_FORMAT,
+			FactoryUtil.VALUE_FORMAT_PREFIX);
+		helper.validate();
+
+		return new DynamicTableSinkMock(
+			helper.getOptions().get(TARGET),
+			helper.getOptions().get(BUFFER_SIZE),
+			keyFormat.orElse(null),
+			valueFormat);
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(TARGET);
+		options.add(VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(BUFFER_SIZE);
+		options.add(KEY_FORMAT);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table source
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link DynamicTableSource} for testing.
+	 */
+	public static class DynamicTableSourceMock implements ScanTableSource {
+
+		public final String target;
+		public final @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat;
+		public final ScanFormat<DeserializationSchema<RowData>> sourceValueFormat;
+
+		DynamicTableSourceMock(
+				String target,
+				@Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat,
+				ScanFormat<DeserializationSchema<RowData>> sourceValueFormat) {
+			this.target = target;
+			this.sourceKeyFormat = sourceKeyFormat;
+			this.sourceValueFormat = sourceValueFormat;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode() {
+			return null;
+		}
+
+		@Override
+		public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+			return null;
+		}
+
+		@Override
+		public DynamicTableSource copy() {
+			return null;
+		}
+
+		@Override
+		public String asSummaryString() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			DynamicTableSourceMock that = (DynamicTableSourceMock) o;
+			return target.equals(that.target) &&
+				Objects.equals(sourceKeyFormat, that.sourceKeyFormat) &&
+				sourceValueFormat.equals(that.sourceValueFormat);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(target, sourceKeyFormat, sourceValueFormat);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table sink
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link DynamicTableSink} for testing.
+	 */
+	public static class DynamicTableSinkMock implements DynamicTableSink {
+
+		public final String target;
+		public final Long bufferSize;
+		public final @Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat;
+		public final SinkFormat<SerializationSchema<RowData>> sinkValueFormat;
+
+		DynamicTableSinkMock(
+				String target,
+				Long bufferSize,
+				@Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat,
+				SinkFormat<SerializationSchema<RowData>> sinkValueFormat) {
+			this.target = target;
+			this.bufferSize = bufferSize;
+			this.sinkKeyFormat = sinkKeyFormat;
+			this.sinkValueFormat = sinkValueFormat;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+			return null;
+		}
+
+		@Override
+		public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+			return null;
+		}
+
+		@Override
+		public DynamicTableSink copy() {
+			return null;
+		}
+
+		@Override
+		public String asSummaryString() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			DynamicTableSinkMock that = (DynamicTableSinkMock) o;
+			return target.equals(that.target) &&
+				bufferSize.equals(that.bufferSize) &&
+				Objects.equals(sinkKeyFormat, that.sinkKeyFormat) &&
+				sinkValueFormat.equals(that.sinkValueFormat);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(target, bufferSize, sinkKeyFormat, sinkValueFormat);
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
new file mode 100644
index 0000000..c04bdb6
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Tests implementations for {@link DeserializationFormatFactory} and {@link SerializationFormatFactory}.
+ */
+public class TestFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
+
+	public static final String IDENTIFIER = "test-format";
+
+	public static final ConfigOption<String> DELIMITER = ConfigOptions
+		.key("delimiter")
+		.stringType()
+		.noDefaultValue();
+
+	public static final ConfigOption<Boolean> FAIL_ON_MISSING = ConfigOptions
+		.key("fail-on-missing")
+		.booleanType()
+		.defaultValue(false);
+
+	@Override
+	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+			DynamicTableFactory.Context context,
+			ReadableConfig formatConfig) {
+		FactoryUtil.validateFactoryOptions(this, formatConfig);
+		return new ScanFormatMock(formatConfig.get(DELIMITER), formatConfig.get(FAIL_ON_MISSING));
+	}
+
+	@Override
+	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+			DynamicTableFactory.Context context,
+			ReadableConfig formatConfig) {
+		FactoryUtil.validateFactoryOptions(this, formatConfig);
+		return new SinkFormatMock(formatConfig.get(DELIMITER));
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(DELIMITER);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FAIL_ON_MISSING);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table source format
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link ScanFormat} for testing.
+	 */
+	public static class ScanFormatMock implements ScanFormat<DeserializationSchema<RowData>> {
+
+		public final String delimiter;
+		public final Boolean failOnMissing;
+
+		ScanFormatMock(String delimiter, Boolean failOnMissing) {
+			this.delimiter = delimiter;
+			this.failOnMissing = failOnMissing;
+		}
+
+		@Override
+		public DeserializationSchema<RowData> createScanFormat(
+				ScanTableSource.Context context,
+				DataType producedDataType) {
+			return null;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			ScanFormatMock that = (ScanFormatMock) o;
+			return delimiter.equals(that.delimiter) && failOnMissing.equals(that.failOnMissing);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(delimiter, failOnMissing);
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Table sink format
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * {@link SinkFormat} for testing.
+	 */
+	public static class SinkFormatMock implements SinkFormat<SerializationSchema<RowData>> {
+
+		public final String delimiter;
+
+		SinkFormatMock(String delimiter) {
+			this.delimiter = delimiter;
+		}
+
+		@Override
+		public SerializationSchema<RowData> createSinkFormat(
+				ScanTableSource.Context context,
+				DataType consumeDataType) {
+			return null;
+		}
+
+		@Override
+		public ChangelogMode getChangelogMode() {
+			return null;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			SinkFormatMock that = (SinkFormatMock) o;
+			return delimiter.equals(that.delimiter);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(delimiter);
+		}
+	}
+}
diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..d31007a
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.TestDynamicTableFactory
+org.apache.flink.table.factories.TestFormatFactory


[flink] 02/03: [hotfix][table-common] Enable receiving isBounded in DynamicTableSink

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5adbba8d6abff400539399e1a42cc2f7b71137ff
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Apr 30 10:30:29 2020 +0200

    [hotfix][table-common] Enable receiving isBounded in DynamicTableSink
---
 .../org/apache/flink/table/connector/sink/DynamicTableSink.java    | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index b573916..97595f1 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -125,6 +125,13 @@ public interface DynamicTableSink {
 	interface Context {
 
 		/**
+		 * Returns whether a runtime implementation can expect a finite number of rows.
+		 *
+		 * <p>This information might be derived from the session's execution mode and/or kind of query.
+		 */
+		boolean isBounded();
+
+		/**
 		 * Creates a converter for mapping between Flink's internal data structures and objects specified
 		 * by the given {@link DataType} that can be passed into a runtime implementation.
 		 *


[flink] 01/03: [hotfix][table-common] Improve terminology around catalog table options

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5aec41d77e03f81d7d5e55d12040d09b301b1411
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Apr 30 10:29:25 2020 +0200

    [hotfix][table-common] Improve terminology around catalog table options
---
 flink-python/pyflink/table/catalog.py                  | 13 +++++++++++++
 .../apache/flink/table/catalog/CatalogBaseTable.java   | 18 +++++++++++++++---
 .../org/apache/flink/table/catalog/CatalogTable.java   | 14 +++++++-------
 3 files changed, 35 insertions(+), 10 deletions(-)

diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py
index 5fcd859..9d3a7d3 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -618,10 +618,23 @@ class CatalogBaseTable(object):
     def _get(j_catalog_base_table):
         return CatalogBaseTable(j_catalog_base_table)
 
+    def get_options(self):
+        """
+        Returns a map of string-based options.
+
+        In case of CatalogTable, these options may determine the kind of connector and its
+        configuration for accessing the data in the external system.
+
+        :return: Property map of the table/view.
+        """
+        return dict(self._j_catalog_base_table.getOptions())
+
     def get_properties(self):
         """
         Get the properties of the table.
 
+        This method is deprecated. Use :func:`~pyflink.table.CatalogBaseTable.get_options` instead.
+
         :return: Property map of the table/view.
         """
         return dict(self._j_catalog_base_table.getProperties())
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
index e2157e0..3d3a9aa 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogBaseTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.factories.DynamicTableFactory;
 
 import java.util.Map;
 import java.util.Optional;
@@ -28,14 +29,25 @@ import java.util.Optional;
  * key-value pairs defining the properties of the table.
  */
 public interface CatalogBaseTable {
+
 	/**
-	 * Get the properties of the table.
-	 *
-	 * @return property map of the table/view
+	 * @deprecated Use {@link #getOptions()}.
 	 */
+	@Deprecated
 	Map<String, String> getProperties();
 
 	/**
+	 * Returns a map of string-based options.
+	 *
+	 * <p>In case of {@link CatalogTable}, these options may determine the kind of connector and its
+	 * configuration for accessing the data in the external system. See {@link DynamicTableFactory}
+	 * for more information.
+	 */
+	default Map<String, String> getOptions() {
+		return getProperties();
+	}
+
+	/**
 	 * Get the schema of the table.
 	 *
 	 * @return schema of the table/view.
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
index 2f55880..fb51e1e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java
@@ -40,17 +40,17 @@ public interface CatalogTable extends CatalogBaseTable {
 	List<String> getPartitionKeys();
 
 	/**
-	 * Return a property map for table factory discovery purpose. The properties will be used to match a [[TableFactory]].
-	 * Please refer to {@link org.apache.flink.table.factories.TableFactory}
+	 * Returns a copy of this {@code CatalogTable} with given table options {@code options}.
 	 *
-	 * @return a map of properties
+	 * @return a new copy of this table with replaced table options
 	 */
-	Map<String, String> toProperties();
+	CatalogTable copy(Map<String, String> options);
 
 	/**
-	 * Returns a copy of this {@code CatalogTable} with given table options {@code options}.
+	 * Serializes this instance into a map of string-based properties.
 	 *
-	 * @return a new copy of this table with replaced table options
+	 * <p>Compared to the pure table options in {@link #getOptions()}, the map includes schema,
+	 * partitioning, and other characteristics in a serialized form.
 	 */
-	CatalogTable copy(Map<String, String> options);
+	Map<String, String> toProperties();
 }