You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/05/05 18:48:54 UTC
[flink] 03/03: [FLINK-16997][table-common] Add new factory
interfaces and discovery utilities
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fef974d7df1a9dabd62f3288c086185d69298bc1
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Apr 30 11:03:47 2020 +0200
[FLINK-16997][table-common] Add new factory interfaces and discovery utilities
Implements the new factory interfaces mentioned in FLIP-95. Adds new factory utilities
that can be used for FLIP-122 and future factories.
It adds TestDynamicTableFactory and TestFormatFactory for reference implementations
of new factories.
This closes #11959.
---
.../table/tests/test_catalog_completeness.py | 1 +
.../org/apache/flink/table/catalog/Catalog.java | 20 +
.../flink/table/connector/format/Format.java | 55 ++
.../flink/table/connector/format/ScanFormat.java | 37 ++
.../flink/table/connector/format/SinkFormat.java | 38 ++
.../factories/DeserializationFormatFactory.java | 34 ++
.../flink/table/factories/DynamicTableFactory.java | 73 +++
.../table/factories/DynamicTableSinkFactory.java | 42 ++
.../table/factories/DynamicTableSourceFactory.java | 42 ++
.../org/apache/flink/table/factories/Factory.java | 79 +++
.../apache/flink/table/factories/FactoryUtil.java | 570 +++++++++++++++++++++
.../flink/table/factories/ScanFormatFactory.java | 50 ++
.../factories/SerializationFormatFactory.java | 34 ++
.../flink/table/factories/SinkFormatFactory.java | 50 ++
.../flink/table/factories/FactoryUtilTest.java | 285 +++++++++++
.../table/factories/TestDynamicTableFactory.java | 257 ++++++++++
.../flink/table/factories/TestFormatFactory.java | 180 +++++++
.../org.apache.flink.table.factories.Factory | 17 +
18 files changed, 1864 insertions(+)
diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index 3e83357..245a2be 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -43,6 +43,7 @@ class CatalogAPICompletenessTests(PythonAPICompletenessTestCase, unittest.TestCa
return {
'open',
'close',
+ 'getFactory',
'getTableFactory',
'getFunctionDefinitionFactory',
'listPartitionsByFilter'}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index b4ff5f3..1e4c482 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -35,6 +35,8 @@ import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
@@ -49,11 +51,29 @@ import java.util.Optional;
public interface Catalog {
/**
+ * Returns a factory for creating instances from catalog objects.
+ *
+ * <p>This method enables bypassing the discovery process. Implementers can directly pass internal
+ * catalog-specific objects to their own factory. For example, a custom {@link CatalogTable} can
+ * be processed by a custom {@link DynamicTableFactory}.
+ *
+ * <p>Because all factories are interfaces, the returned {@link Factory} instance can implement multiple
+ * supported extension points. An {@code instanceof} check is performed by the caller that checks
+ * whether a required factory is implemented; otherwise the discovery process is used.
+ */
+ default Optional<Factory> getFactory() {
+ return Optional.empty();
+ }
+
+ /**
* Get an optional {@link TableFactory} instance that's responsible for generating table-related
* instances stored in this catalog, instances such as source/sink.
*
* @return an optional TableFactory instance
+ * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses the
+ * new table sources and sinks defined in FLIP-95 and a slightly different discovery mechanism.
*/
+ @Deprecated
default Optional<TableFactory> getTableFactory() {
return Optional.empty();
}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
new file mode 100644
index 0000000..40c0115
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Base interface for connector formats.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface is an intermediate representation before constructing actual
+ * runtime implementation.
+ *
+ * <p>Formats can be distinguished along two dimensions:
+ * <ul>
+ * <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}).
+ * <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or
+ * some bulk interface).</li>
+ * </ul>
+ *
+ * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector.
+ *
+ * @see ScanFormat
+ * @see SinkFormat
+ */
+@PublicEvolving
+public interface Format {
+
+ /**
+ * Returns the set of changes that a connector (and transitively the planner) can expect during
+ * runtime.
+ */
+ ChangelogMode getChangelogMode();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
new file mode 100644
index 0000000..529ea37
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link ScanTableSource}.
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormat<I> extends Format {
+
+ /**
+ * Creates runtime implementation that is configured to produce data of the given data type.
+ */
+ I createScanFormat(ScanTableSource.Context context, DataType producedDataType);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
new file mode 100644
index 0000000..0b67336
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.format;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.types.DataType;
+
+/**
+ * A {@link Format} for a {@link DynamicTableSink}.
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormat<I> extends Format {
+
+ /**
+ * Creates runtime implementation that is configured to consume data of the given data type.
+ */
+ I createSinkFormat(ScanTableSource.Context context, DataType consumedDataType);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
new file mode 100644
index 0000000..1901fc5
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link ScanFormat} for {@link DeserializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface DeserializationFormatFactory extends ScanFormatFactory<DeserializationSchema<RowData>> {
+ // interface is used for discovery but is already fully specified by the generics
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
new file mode 100644
index 0000000..0d4f8ba
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Base interface for configuring a dynamic table connector for an external storage system from catalog
+ * and session information.
+ *
+ * <p>Dynamic tables are the core concept of Flink's Table & SQL API for processing both bounded and
+ * unbounded data in a unified fashion.
+ *
+ * <p>Implement {@link DynamicTableSourceFactory} for constructing a {@link DynamicTableSource}.
+ *
+ * <p>Implement {@link DynamicTableSinkFactory} for constructing a {@link DynamicTableSink}.
+ *
+ * <p>The options {@link FactoryUtil#PROPERTY_VERSION} and {@link FactoryUtil#CONNECTOR} are implicitly
+ * added and must not be declared.
+ */
+@PublicEvolving
+public interface DynamicTableFactory extends Factory {
+
+ /**
+ * Provides catalog and session information describing the dynamic table to be accessed.
+ */
+ interface Context {
+
+ /**
+ * Returns the identifier of the table in the {@link Catalog}.
+ */
+ ObjectIdentifier getObjectIdentifier();
+
+ /**
+ * Returns table information received from the {@link Catalog}.
+ */
+ CatalogTable getCatalogTable();
+
+ /**
+ * Gives read-only access to the configuration of the current session.
+ */
+ ReadableConfig getConfiguration();
+
+ /**
+ * Returns the class loader of the current session.
+ *
+ * <p>The class loader is in particular useful for discovering further (nested) factories.
+ */
+ ClassLoader getClassLoader();
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
new file mode 100644
index 0000000..7f428fe
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSinkFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+
+/**
+ * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a factory.
+ */
+@PublicEvolving
+public interface DynamicTableSinkFactory extends DynamicTableFactory {
+
+ /**
+ * Creates a {@link DynamicTableSink} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>An implementation should perform validation and the discovery of further (nested) factories
+ * in this method.
+ */
+ DynamicTableSink createDynamicTableSink(Context context);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
new file mode 100644
index 0000000..3c0ecd1
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableSourceFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+
+/**
+ * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>See {@link Factory} for more information about the general design of a factory.
+ */
+@PublicEvolving
+public interface DynamicTableSourceFactory extends DynamicTableFactory {
+
+ /**
+ * Creates a {@link DynamicTableSource} instance from a {@link CatalogTable} and additional context
+ * information.
+ *
+ * <p>An implementation should perform validation and the discovery of further (nested) factories
+ * in this method.
+ */
+ DynamicTableSource createDynamicTableSource(Context context);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
new file mode 100644
index 0000000..b669e76
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/Factory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Set;
+
+/**
+ * Base interface for all kind of factories that create object instances from a list of key-value pairs
+ * in Flink's Table & SQL API.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link #factoryIdentifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service Provider Interfaces (SPI). Classes
+ * that implement this interface can be added to {@code META_INF/services/org.apache.flink.table.factories.Factory}
+ * in JAR files.
+ *
+ * <p>Every factory declares a set of required and optional options. This information will not be used
+ * during discovery but is helpful when generating documentation and performing validation. A factory may
+ * discover further (nested) factories, the options of the nested factories must not be declared in the
+ * sets of this factory.
+ *
+ * <p>It is the responsibility of each factory to perform validation before returning an instance.
+ *
+ * <p>For consistency, the following style for key names of {@link ConfigOption} is recommended:
+ * <ul>
+ * <li>Try to <b>reuse</b> key names as much as possible. Use other factory implementations as an example.
+ * <li>Key names should be declared in <b>lower case</b>. Use "-" instead of dots or camel case to split words.
+ * <li>Key names should be <b>hierarchical</b> where appropriate. Think about how one would define such
+ * a hierarchy in JSON or YAML file (e.g. {@code sink.bulk-flush.max-actions}).
+ * <li>In case of a hierarchy, try not to use the higher level again in the key name (e.g. do {@code sink.partitioner}
+ * instead of {@code sink.sink-partitioner}) to <b>keep the keys short</b>.
+ * </ul>
+ */
+@PublicEvolving
+public interface Factory {
+
+ /**
+ * Returns a unique identifier among same factory interfaces.
+ *
+ * <p>For consistency, an identifier should be declared as one lower case word (e.g. {@code kafka}). If
+ * multiple factories exist for different versions, a version should be appended using "-" (e.g. {@code kafka-0.10}).
+ */
+ String factoryIdentifier();
+
+ /**
+ * Returns a set of {@link ConfigOption} that an implementation of this factory requires in addition to
+ * {@link #optionalOptions()}.
+ *
+ * <p>See the documentation of {@link Factory} for more information.
+ */
+ Set<ConfigOption<?>> requiredOptions();
+
+ /**
+ * Returns a set of {@link ConfigOption} that an implementation of this factory consumes in addition to
+ * {@link #requiredOptions()}.
+ *
+ * <p>See the documentation of {@link Factory} for more information.
+ */
+ Set<ConfigOption<?>> optionalOptions();
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
new file mode 100644
index 0000000..fa42b8f
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for working with {@link Factory}s.
+ */
+@PublicEvolving
+public final class FactoryUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+
+ public static final ConfigOption<Integer> PROPERTY_VERSION = ConfigOptions.key("property-version")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "Version of the overall property design. This option is meant for future backwards compatibility.");
+
+ public static final ConfigOption<String> CONNECTOR = ConfigOptions.key("connector")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Uniquely identifies the connector of a dynamic table that is used for accessing data in " +
+ "an external system. Its value is used during table source and table sink discovery.");
+
+ public static final String FORMAT_PREFIX = "format.";
+
+ public static final String KEY_FORMAT_PREFIX = "key.format.";
+
+ public static final String VALUE_FORMAT_PREFIX = "value.format.";
+
+ /**
+ * Creates a {@link DynamicTableSource} from a {@link CatalogTable}.
+ *
+ * <p>It considers {@link Catalog#getFactory()} if provided.
+ */
+ public static DynamicTableSource createTableSource(
+ @Nullable Catalog catalog,
+ ObjectIdentifier objectIdentifier,
+ CatalogTable catalogTable,
+ ReadableConfig configuration,
+ ClassLoader classLoader) {
+ final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+ objectIdentifier,
+ catalogTable,
+ configuration,
+ classLoader);
+ try {
+ final DynamicTableSourceFactory factory = getDynamicTableFactory(
+ DynamicTableSourceFactory.class,
+ catalog,
+ context);
+ return factory.createDynamicTableSource(context);
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Unable to create a source for reading table '%s'.\n\n" +
+ "Table options are:\n\n" +
+ "%s",
+ objectIdentifier.asSummaryString(),
+ catalogTable.getOptions()
+ .entrySet()
+ .stream()
+ .map(e -> stringifyOption(e.getKey(), e.getValue()))
+ .sorted()
+ .collect(Collectors.joining("\n"))),
+ t);
+ }
+ }
+
+ /**
+ * Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
+ *
+ * <p>It considers {@link Catalog#getFactory()} if provided.
+ */
+ public static DynamicTableSink createTableSink(
+ @Nullable Catalog catalog,
+ ObjectIdentifier objectIdentifier,
+ CatalogTable catalogTable,
+ ReadableConfig configuration,
+ ClassLoader classLoader) {
+ final DefaultDynamicTableContext context = new DefaultDynamicTableContext(
+ objectIdentifier,
+ catalogTable,
+ configuration,
+ classLoader);
+ try {
+ final DynamicTableSinkFactory factory = getDynamicTableFactory(
+ DynamicTableSinkFactory.class,
+ catalog,
+ context);
+ return factory.createDynamicTableSink(context);
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Unable to create a sink for writing table '%s'.\n\n" +
+ "Table options are:\n\n" +
+ "%s",
+ objectIdentifier.asSummaryString(),
+ catalogTable.getOptions()
+ .entrySet()
+ .stream()
+ .map(e -> stringifyOption(e.getKey(), e.getValue()))
+ .sorted()
+ .collect(Collectors.joining("\n"))),
+ t);
+ }
+ }
+
+ /**
+ * Creates a utility that helps in discovering formats and validating all options for a {@link DynamicTableFactory}.
+ *
+ * <p>The following example sketches the usage:
+ * <pre>{@code
+ * // in createDynamicTableSource()
+ * helper = FactoryUtil.createTableFactoryHelper(this, context);
+ * keyFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, KEY_OPTION, "prefix");
+ * valueFormat = helper.discoverScanFormat(classloader, MyFormatFactory.class, VALUE_OPTION, "prefix");
+ * helper.validate();
+ * ... // construct connector with discovered formats
+ * }</pre>
+ *
+ * <p>Note: This utility checks for left-over options in the final step.
+ */
+ public static TableFactoryHelper createTableFactoryHelper(
+ DynamicTableFactory factory,
+ DynamicTableFactory.Context context) {
+ return new TableFactoryHelper(factory, context);
+ }
+
+ /**
+ * Discovers a factory using the given factory base class and identifier.
+ *
+ * <p>This method is meant for cases where {@link #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)}
+ * {@link #createTableSource(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)},
+ * and {@link #createTableSink(Catalog, ObjectIdentifier, CatalogTable, ReadableConfig, ClassLoader)}
+ * are not applicable.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends Factory> T discoverFactory(
+ ClassLoader classLoader,
+ Class<T> factoryClass,
+ String factoryIdentifier) {
+ final List<Factory> factories = discoverFactories(classLoader);
+
+ final List<Factory> foundFactories = factories.stream()
+ .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+ .collect(Collectors.toList());
+
+ if (foundFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factories that implement '%s' in the classpath.",
+ factoryClass.getName()));
+ }
+
+ final List<Factory> matchingFactories = foundFactories.stream()
+ .filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+ .collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factory for identifier '%s' that implements '%s' in the classpath.\n\n" +
+ "Available factory identifiers are:\n\n" +
+ "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ foundFactories.stream()
+ .map(Factory::factoryIdentifier)
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new ValidationException(
+ String.format(
+ "Multiple factories for identifier '%s' that implement '%s' found in the classpath.\n\n" +
+ "Ambiguous factory classes are:\n\n" +
+ "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ foundFactories.stream()
+ .map(f -> factories.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return (T) matchingFactories.get(0);
+ }
+
+ /**
+ * Validates the required and optional {@link ConfigOption}s of a factory.
+ *
+ * <p>Note: It does not check for left-over options.
+ */
+ public static void validateFactoryOptions(Factory factory, ReadableConfig options) {
+ // currently Flink's options have no validation feature which is why we access them eagerly
+ // to provoke a parsing error
+
+ final List<String> missingRequiredOptions = factory.requiredOptions().stream()
+ .filter(option -> readOption(options, option) == null)
+ .map(ConfigOption::key)
+ .sorted()
+ .collect(Collectors.toList());
+
+ if (!missingRequiredOptions.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "One or more required options are missing.\n\n" +
+ "Missing required options are:\n\n" +
+ "%s",
+ String.join("\n", missingRequiredOptions)));
+ }
+
+ factory.optionalOptions()
+ .forEach(option -> readOption(options, option));
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper methods
+ // --------------------------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ private static <T extends DynamicTableFactory> T getDynamicTableFactory(
+ Class<T> factoryClass,
+ @Nullable Catalog catalog,
+ DefaultDynamicTableContext context) {
+ // catalog factory has highest precedence
+ if (catalog != null) {
+ final Factory factory = catalog.getFactory()
+ .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+ .orElse(null);
+ if (factory != null) {
+ return (T) factory;
+ }
+ }
+
+ // fallback to factory discovery
+ final String connectorOption = context.getCatalogTable()
+ .getOptions()
+ .get(CONNECTOR.key());
+ if (connectorOption == null) {
+ throw new ValidationException(
+ String.format(
+ "Table options do not contain an option key '%s' for discovering a connector.",
+ CONNECTOR.key()));
+ }
+ try {
+ return discoverFactory(context.getClassLoader(), factoryClass, connectorOption);
+ } catch (ValidationException e) {
+ throw new ValidationException(
+ String.format(
+ "Cannot discover a connector using option '%s'.",
+ stringifyOption(CONNECTOR.key(), connectorOption)),
+ e);
+ }
+ }
+
+ private static List<Factory> discoverFactories(ClassLoader classLoader) {
+ try {
+ final List<Factory> result = new LinkedList<>();
+ ServiceLoader
+ .load(Factory.class, classLoader)
+ .iterator()
+ .forEachRemaining(result::add);
+ return result;
+ } catch (ServiceConfigurationError e) {
+ LOG.error("Could not load service provider for factories.", e);
+ throw new TableException("Could not load service provider for factories.", e);
+ }
+ }
+
+ private static String stringifyOption(String key, String value) {
+ return String.format(
+ "'%s'='%s'",
+ EncodingUtils.escapeSingleQuotes(key),
+ EncodingUtils.escapeSingleQuotes(value));
+ }
+
+ private static Configuration asConfiguration(Map<String, String> options) {
+ final Configuration configuration = new Configuration();
+ options.forEach(configuration::setString);
+ return configuration;
+ }
+
+ private static <T> T readOption(ReadableConfig options, ConfigOption<T> option) {
+ try {
+ return options.get(option);
+ } catch (Throwable t) {
+ throw new ValidationException(String.format("Invalid value for option '%s'.", option.key()), t);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Helper classes
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Helper utility for discovering formats and validating all options for a {@link DynamicTableFactory}.
+ *
+ * @see #createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+ public static class TableFactoryHelper {
+
+ private final DynamicTableFactory tableFactory;
+
+ private final DynamicTableFactory.Context context;
+
+ private final Configuration allOptions;
+
+ private final Set<String> consumedOptionKeys;
+
+ private TableFactoryHelper(DynamicTableFactory tableFactory, DynamicTableFactory.Context context) {
+ this.tableFactory = tableFactory;
+ this.context = context;
+ this.allOptions = asConfiguration(context.getCatalogTable().getOptions());
+ this.consumedOptionKeys = new HashSet<>();
+ this.consumedOptionKeys.add(PROPERTY_VERSION.key());
+ this.consumedOptionKeys.add(CONNECTOR.key());
+ this.consumedOptionKeys.addAll(
+ tableFactory.requiredOptions().stream()
+ .map(ConfigOption::key)
+ .collect(Collectors.toSet()));
+ this.consumedOptionKeys.addAll(
+ tableFactory.optionalOptions().stream()
+ .map(ConfigOption::key)
+ .collect(Collectors.toSet()));
+ }
+
+ /**
+ * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+ *
+ * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+ */
+ public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+ Class<F> formatFactoryClass,
+ ConfigOption<String> formatOption,
+ String formatPrefix) {
+ return discoverOptionalScanFormat(formatFactoryClass, formatOption, formatPrefix)
+ .orElseThrow(() ->
+ new ValidationException(
+ String.format("Could not find required scan format '%s'.", formatOption.key())));
+ }
+
+ /**
+ * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
+ * identifier.
+ *
+ * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+ */
+ public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
+ Class<F> formatFactoryClass,
+ ConfigOption<String> formatOption,
+ String formatPrefix) {
+ return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+ .map(formatFactory -> {
+ try {
+ return formatFactory.createScanFormat(context, projectOptions(formatPrefix));
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Error creating scan format '%s' in option space '%s'.",
+ formatFactory.factoryIdentifier(),
+ formatPrefix),
+ t);
+ }
+ });
+ }
+
+ /**
+ * Discovers a {@link SinkFormat} of the given type using the given option as factory identifier.
+ *
+ * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+ */
+ public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat(
+ Class<F> formatFactoryClass,
+ ConfigOption<String> formatOption,
+ String formatPrefix) {
+ return discoverOptionalSinkFormat(formatFactoryClass, formatOption, formatPrefix)
+ .orElseThrow(() ->
+ new ValidationException(
+ String.format("Could not find required sink format '%s'.", formatOption.key())));
+ }
+
+ /**
+ * Discovers a {@link SinkFormat} of the given type using the given option (if present) as factory
+ * identifier.
+ *
+ * <p>A prefix, e.g. {@link #KEY_FORMAT_PREFIX}, projects the options for the format factory.
+ */
+ public <I, F extends SinkFormatFactory<I>> Optional<SinkFormat<I>> discoverOptionalSinkFormat(
+ Class<F> formatFactoryClass,
+ ConfigOption<String> formatOption,
+ String formatPrefix) {
+ return discoverOptionalFormatFactory(formatFactoryClass, formatOption, formatPrefix)
+ .map(formatFactory -> {
+ try {
+ return formatFactory.createSinkFormat(context, projectOptions(formatPrefix));
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Error creating sink format '%s' in option space '%s'.",
+ formatFactory.factoryIdentifier(),
+ formatPrefix),
+ t);
+ }
+ });
+ }
+
+ /**
+ * Validates the options of the {@link DynamicTableFactory}. It checks for unconsumed option
+ * keys.
+ */
+ public void validate() {
+ validateFactoryOptions(tableFactory, allOptions);
+ final Set<String> remainingOptionKeys = new HashSet<>(allOptions.keySet());
+ remainingOptionKeys.removeAll(consumedOptionKeys);
+ if (remainingOptionKeys.size() > 0) {
+ throw new ValidationException(
+ String.format(
+ "Unsupported options found for connector '%s'.\n\n" +
+ "Unsupported options:\n\n" +
+ "%s\n\n" +
+ "Supported options:\n\n" +
+ "%s",
+ tableFactory.factoryIdentifier(),
+ remainingOptionKeys.stream()
+ .sorted()
+ .collect(Collectors.joining("\n")),
+ consumedOptionKeys.stream()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ }
+
+ /**
+ * Returns all options of the table.
+ */
+ public ReadableConfig getOptions() {
+ return allOptions;
+ }
+
+ // ----------------------------------------------------------------------------------------
+
+ private <F extends Factory> Optional<F> discoverOptionalFormatFactory(
+ Class<F> formatFactoryClass,
+ ConfigOption<String> formatOption,
+ String formatPrefix) {
+ final String identifier = allOptions.get(formatOption);
+ if (identifier == null) {
+ return Optional.empty();
+ }
+ final F factory = discoverFactory(
+ context.getClassLoader(),
+ formatFactoryClass,
+ identifier);
+ // log all used options of other factories
+ consumedOptionKeys.addAll(
+ factory.requiredOptions().stream()
+ .map(ConfigOption::key)
+ .map(k -> formatPrefix + k)
+ .collect(Collectors.toSet()));
+ consumedOptionKeys.addAll(
+ factory.optionalOptions().stream()
+ .map(ConfigOption::key)
+ .map(k -> formatPrefix + k)
+ .collect(Collectors.toSet()));
+ return Optional.of(factory);
+ }
+
+ private ReadableConfig projectOptions(String formatPrefix) {
+ return new DelegatingConfiguration(
+ allOptions,
+ formatPrefix);
+ }
+ }
+
+ private static class DefaultDynamicTableContext implements DynamicTableFactory.Context {
+
+ private final ObjectIdentifier objectIdentifier;
+ private final CatalogTable catalogTable;
+ private final ReadableConfig configuration;
+ private final ClassLoader classLoader;
+
+ DefaultDynamicTableContext(
+ ObjectIdentifier objectIdentifier,
+ CatalogTable catalogTable,
+ ReadableConfig configuration,
+ ClassLoader classLoader) {
+ this.objectIdentifier = objectIdentifier;
+ this.catalogTable = catalogTable;
+ this.configuration = configuration;
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ public ObjectIdentifier getObjectIdentifier() {
+ return objectIdentifier;
+ }
+
+ @Override
+ public CatalogTable getCatalogTable() {
+ return catalogTable;
+ }
+
+ @Override
+ public ReadableConfig getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private FactoryUtil() {
+ // no instantiation
+ }
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
new file mode 100644
index 0000000..184c432
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link ScanFormat} for a {@link ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface helps in making such formats pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table source
+ */
+@PublicEvolving
+public interface ScanFormatFactory<I> extends Factory {
+
+ /**
+ * Creates a format from the given context and format options.
+ *
+ * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
+ * to {@code format.ignore-errors}).
+ */
+ ScanFormat<I> createScanFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
new file mode 100644
index 0000000..9b669df
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Factory for creating a {@link SinkFormat} for {@link SerializationSchema}.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@PublicEvolving
+public interface SerializationFormatFactory extends SinkFormatFactory<SerializationSchema<RowData>> {
+ // interface is used for discovery but is already fully specified by the generics
+}
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
new file mode 100644
index 0000000..4212ea6
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+
+/**
+ * Base interface for configuring a {@link SinkFormat} for a {@link ScanTableSource}.
+ *
+ * <p>Depending on the kind of external system, a connector might support different encodings for
+ * reading and writing rows. This interface helps in making such formats pluggable.
+ *
+ * <p>The created {@link Format} instance is an intermediate representation that can be used to construct
+ * runtime implementation in a later step.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ *
+ * @param <I> runtime interface needed by the table sink
+ */
+@PublicEvolving
+public interface SinkFormatFactory<I> extends Factory {
+
+ /**
+ * Creates a format from the given context and format options.
+ *
+ * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
+ * to {@code format.ignore-errors}).
+ */
+ SinkFormat<I> createSinkFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
new file mode 100644
index 0000000..15f6412
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock;
+import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock;
+import org.apache.flink.table.factories.TestFormatFactory.ScanFormatMock;
+import org.apache.flink.table.factories.TestFormatFactory.SinkFormatMock;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link FactoryUtil}.
+ */
+public class FactoryUtilTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testMissingConnector() {
+ expectError("Table options do not contain an option key 'connector' for discovering a connector.");
+ testError(options -> options.remove("connector"));
+ }
+
+ @Test
+ public void testInvalidConnector() {
+ expectError(
+ "Could not find any factory for identifier 'FAIL' that implements '" +
+ DynamicTableSourceFactory.class.getName() + "' in the classpath.\n\n" +
+ "Available factory identifiers are:\n\n" +
+ "test-connector");
+ testError(options -> options.put("connector", "FAIL"));
+ }
+
+ @Test
+ public void testMissingConnectorOption() {
+ expectError(
+ "One or more required options are missing.\n\n" +
+ "Missing required options are:\n\n" +
+ "target");
+ testError(options -> options.remove("target"));
+ }
+
+ @Test
+ public void testInvalidConnectorOption() {
+ expectError("Invalid value for option 'buffer-size'.");
+ testError(options -> options.put("buffer-size", "FAIL"));
+ }
+
+ @Test
+ public void testMissingFormat() {
+ expectError("Could not find required scan format 'value.format.kind'.");
+ testError(options -> options.remove("value.format.kind"));
+ }
+
+ @Test
+ public void testInvalidFormat() {
+ expectError(
+ "Could not find any factory for identifier 'FAIL' that implements '" +
+ DeserializationFormatFactory.class.getName() + "' in the classpath.\n\n" +
+ "Available factory identifiers are:\n\n" +
+ "test-format");
+ testError(options -> options.put("value.format.kind", "FAIL"));
+ }
+
+ @Test
+ public void testMissingFormatOption() {
+ expectError(
+ "Error creating scan format 'test-format' in option space 'key.format.'.");
+ expectError(
+ "One or more required options are missing.\n\n" +
+ "Missing required options are:\n\n" +
+ "delimiter");
+ testError(options -> options.remove("key.format.delimiter"));
+ }
+
+ @Test
+ public void testInvalidFormatOption() {
+ expectError("Invalid value for option 'fail-on-missing'.");
+ testError(options -> options.put("key.format.fail-on-missing", "FAIL"));
+ }
+
+ @Test
+ public void testUnconsumedOption() {
+ expectError(
+ "Unsupported options found for connector 'test-connector'.\n\n" +
+ "Unsupported options:\n\n" +
+ "this-is-also-not-consumed\n" +
+ "this-is-not-consumed\n\n" +
+ "Supported options:\n\n" +
+ "buffer-size\n" +
+ "connector\n" +
+ "key.format.delimiter\n" +
+ "key.format.fail-on-missing\n" +
+ "key.format.kind\n" +
+ "property-version\n" +
+ "target\n" +
+ "value.format.delimiter\n" +
+ "value.format.fail-on-missing\n" +
+ "value.format.kind");
+ testError(options -> {
+ options.put("this-is-not-consumed", "42");
+ options.put("this-is-also-not-consumed", "true");
+ });
+ }
+
+ @Test
+ public void testAllOptions() {
+ final Map<String, String> options = createAllOptions();
+ final DynamicTableSource actualSource = createTableSource(options);
+ final DynamicTableSource expectedSource = new DynamicTableSourceMock(
+ "MyTarget",
+ new ScanFormatMock(",", false),
+ new ScanFormatMock("|", true));
+ assertEquals(expectedSource, actualSource);
+ final DynamicTableSink actualSink = createTableSink(options);
+ final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+ "MyTarget",
+ 1000L,
+ new SinkFormatMock(","),
+ new SinkFormatMock("|"));
+ assertEquals(expectedSink, actualSink);
+ }
+
+ @Test
+ public void testOptionalFormat() {
+ final Map<String, String> options = createAllOptions();
+ options.remove("key.format.kind");
+ options.remove("key.format.delimiter");
+ final DynamicTableSource actualSource = createTableSource(options);
+ final DynamicTableSource expectedSource = new DynamicTableSourceMock(
+ "MyTarget",
+ null,
+ new ScanFormatMock("|", true));
+ assertEquals(expectedSource, actualSource);
+ final DynamicTableSink actualSink = createTableSink(options);
+ final DynamicTableSink expectedSink = new DynamicTableSinkMock(
+ "MyTarget",
+ 1000L,
+ null,
+ new SinkFormatMock("|"));
+ assertEquals(expectedSink, actualSink);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private void expectError(String message) {
+ thrown.expect(ValidationException.class);
+ thrown.expect(containsCause(new ValidationException(message)));
+ }
+
+ private static void testError(Consumer<Map<String, String>> optionModifier) {
+ final Map<String, String> options = createAllOptions();
+ optionModifier.accept(options);
+ createTableSource(options);
+ }
+
+ private static Map<String, String> createAllOptions() {
+ final Map<String, String> options = new HashMap<>();
+ // we use strings here to test realistic parsing
+ options.put("property-version", "1");
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+ options.put("key.format.kind", TestFormatFactory.IDENTIFIER);
+ options.put("key.format.delimiter", ",");
+ options.put("value.format.kind", TestFormatFactory.IDENTIFIER);
+ options.put("value.format.delimiter", "|");
+ options.put("value.format.fail-on-missing", "true");
+ return options;
+ }
+
+ private static DynamicTableSource createTableSource(Map<String, String> options) {
+ return FactoryUtil.createTableSource(
+ null,
+ ObjectIdentifier.of("cat", "db", "table"),
+ new CatalogTableMock(options),
+ new Configuration(),
+ FactoryUtilTest.class.getClassLoader());
+ }
+
+ private static DynamicTableSink createTableSink(Map<String, String> options) {
+ return FactoryUtil.createTableSink(
+ null,
+ ObjectIdentifier.of("cat", "db", "table"),
+ new CatalogTableMock(options),
+ new Configuration(),
+ FactoryUtilTest.class.getClassLoader());
+ }
+
+ private static class CatalogTableMock implements CatalogTable {
+
+ final Map<String, String> options;
+
+ CatalogTableMock(Map<String, String> options) {
+ this.options = options;
+ }
+
+ @Override
+ public boolean isPartitioned() {
+ return false;
+ }
+
+ @Override
+ public List<String> getPartitionKeys() {
+ return null;
+ }
+
+ @Override
+ public CatalogTable copy(Map<String, String> options) {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> toProperties() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return options;
+ }
+
+ @Override
+ public TableSchema getSchema() {
+ return null;
+ }
+
+ @Override
+ public String getComment() {
+ return null;
+ }
+
+ @Override
+ public CatalogBaseTable copy() {
+ return null;
+ }
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.empty();
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
new file mode 100644
index 0000000..ff5a21f
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Test implementations for {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory}.
+ */
+public final class TestDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+ public static final String IDENTIFIER = "test-connector";
+
+ public static final ConfigOption<String> TARGET = ConfigOptions
+ .key("target")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<Long> BUFFER_SIZE = ConfigOptions
+ .key("buffer-size")
+ .longType()
+ .defaultValue(100L);
+
+ public static final ConfigOption<String> KEY_FORMAT = ConfigOptions
+ .key("key.format.kind")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions
+ .key("value.format.kind")
+ .stringType()
+ .noDefaultValue();
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ final Optional<ScanFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalScanFormat(
+ DeserializationFormatFactory.class,
+ KEY_FORMAT,
+ FactoryUtil.KEY_FORMAT_PREFIX);
+ final ScanFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverScanFormat(
+ DeserializationFormatFactory.class,
+ VALUE_FORMAT,
+ FactoryUtil.VALUE_FORMAT_PREFIX);
+ helper.validate();
+
+ return new DynamicTableSourceMock(
+ helper.getOptions().get(TARGET),
+ keyFormat.orElse(null),
+ valueFormat);
+ }
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+ final Optional<SinkFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalSinkFormat(
+ SerializationFormatFactory.class,
+ KEY_FORMAT,
+ FactoryUtil.KEY_FORMAT_PREFIX);
+ final SinkFormat<SerializationSchema<RowData>> valueFormat = helper.discoverSinkFormat(
+ SerializationFormatFactory.class,
+ VALUE_FORMAT,
+ FactoryUtil.VALUE_FORMAT_PREFIX);
+ helper.validate();
+
+ return new DynamicTableSinkMock(
+ helper.getOptions().get(TARGET),
+ helper.getOptions().get(BUFFER_SIZE),
+ keyFormat.orElse(null),
+ valueFormat);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(TARGET);
+ options.add(VALUE_FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(BUFFER_SIZE);
+ options.add(KEY_FORMAT);
+ return options;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Table source
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * {@link DynamicTableSource} for testing.
+ */
+ public static class DynamicTableSourceMock implements ScanTableSource {
+
+ public final String target;
+ public final @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat;
+ public final ScanFormat<DeserializationSchema<RowData>> sourceValueFormat;
+
+ DynamicTableSourceMock(
+ String target,
+ @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat,
+ ScanFormat<DeserializationSchema<RowData>> sourceValueFormat) {
+ this.target = target;
+ this.sourceKeyFormat = sourceKeyFormat;
+ this.sourceValueFormat = sourceValueFormat;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return null;
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ return null;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DynamicTableSourceMock that = (DynamicTableSourceMock) o;
+ return target.equals(that.target) &&
+ Objects.equals(sourceKeyFormat, that.sourceKeyFormat) &&
+ sourceValueFormat.equals(that.sourceValueFormat);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(target, sourceKeyFormat, sourceValueFormat);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Table sink
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * {@link DynamicTableSink} for testing.
+ */
+ public static class DynamicTableSinkMock implements DynamicTableSink {
+
+ public final String target;
+ public final Long bufferSize;
+ public final @Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat;
+ public final SinkFormat<SerializationSchema<RowData>> sinkValueFormat;
+
+ DynamicTableSinkMock(
+ String target,
+ Long bufferSize,
+ @Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat,
+ SinkFormat<SerializationSchema<RowData>> sinkValueFormat) {
+ this.target = target;
+ this.bufferSize = bufferSize;
+ this.sinkKeyFormat = sinkKeyFormat;
+ this.sinkValueFormat = sinkValueFormat;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return null;
+ }
+
+ @Override
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return null;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DynamicTableSinkMock that = (DynamicTableSinkMock) o;
+ return target.equals(that.target) &&
+ bufferSize.equals(that.bufferSize) &&
+ Objects.equals(sinkKeyFormat, that.sinkKeyFormat) &&
+ sinkValueFormat.equals(that.sinkValueFormat);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(target, bufferSize, sinkKeyFormat, sinkValueFormat);
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
new file mode 100644
index 0000000..c04bdb6
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Tests implementations for {@link DeserializationFormatFactory} and {@link SerializationFormatFactory}.
+ */
+public class TestFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {
+
+ public static final String IDENTIFIER = "test-format";
+
+ public static final ConfigOption<String> DELIMITER = ConfigOptions
+ .key("delimiter")
+ .stringType()
+ .noDefaultValue();
+
+ public static final ConfigOption<Boolean> FAIL_ON_MISSING = ConfigOptions
+ .key("fail-on-missing")
+ .booleanType()
+ .defaultValue(false);
+
+ @Override
+ public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+ DynamicTableFactory.Context context,
+ ReadableConfig formatConfig) {
+ FactoryUtil.validateFactoryOptions(this, formatConfig);
+ return new ScanFormatMock(formatConfig.get(DELIMITER), formatConfig.get(FAIL_ON_MISSING));
+ }
+
+ @Override
+ public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+ DynamicTableFactory.Context context,
+ ReadableConfig formatConfig) {
+ FactoryUtil.validateFactoryOptions(this, formatConfig);
+ return new SinkFormatMock(formatConfig.get(DELIMITER));
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DELIMITER);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FAIL_ON_MISSING);
+ return options;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Table source format
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * {@link ScanFormat} for testing.
+ */
+ public static class ScanFormatMock implements ScanFormat<DeserializationSchema<RowData>> {
+
+ public final String delimiter;
+ public final Boolean failOnMissing;
+
+ ScanFormatMock(String delimiter, Boolean failOnMissing) {
+ this.delimiter = delimiter;
+ this.failOnMissing = failOnMissing;
+ }
+
+ @Override
+ public DeserializationSchema<RowData> createScanFormat(
+ ScanTableSource.Context context,
+ DataType producedDataType) {
+ return null;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ScanFormatMock that = (ScanFormatMock) o;
+ return delimiter.equals(that.delimiter) && failOnMissing.equals(that.failOnMissing);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(delimiter, failOnMissing);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Table sink format
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * {@link SinkFormat} for testing.
+ */
+ public static class SinkFormatMock implements SinkFormat<SerializationSchema<RowData>> {
+
+ public final String delimiter;
+
+ SinkFormatMock(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ @Override
+ public SerializationSchema<RowData> createSinkFormat(
+ ScanTableSource.Context context,
+ DataType consumeDataType) {
+ return null;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SinkFormatMock that = (SinkFormatMock) o;
+ return delimiter.equals(that.delimiter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(delimiter);
+ }
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..d31007a
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.factories.TestDynamicTableFactory
+org.apache.flink.table.factories.TestFormatFactory