You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/01 02:33:33 UTC

[flink] branch master updated: [FLINK-17887][table][connector] Improve interface of ScanFormatFactory and SinkFormatFactory

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 64de78e  [FLINK-17887][table][connector] Improve interface of ScanFormatFactory and SinkFormatFactory
64de78e is described below

commit 64de78e36500b5a8c8720639ded4d1c5f963ad41
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon Jun 1 10:32:40 2020 +0800

    [FLINK-17887][table][connector] Improve interface of ScanFormatFactory and SinkFormatFactory
    
    
    We improved the interfaces with the following changes:
    1. Have a common interface DynamicTableSource.Context, and make Context of ScanTableSource and LookupTableSource extend it, and rename them to LookupContext and ScanContext
    2. Change parameter of ScanFormat.createScanFormat from ScanTableSource.Context to DynamicTableSource.Context
    3. Rename ScanFormat.createScanFormat to DecodingFormat#createRuntimeDecoder()
    4. Rename SinkFormat.createSinkFormat to EncodingFormat#createRuntimeEncoder()
    5. Rename ScanFormatFactory to DecodingFormatFactory
    6. Rename SinkFormatFactory to EncodingFormatFactory
    
    This closes #12320
---
 .../table/Elasticsearch6DynamicSink.java           | 10 ++--
 .../table/Elasticsearch6DynamicSinkFactory.java    |  4 +-
 .../table/Elasticsearch6DynamicSinkTest.java       |  8 ++--
 .../table/Elasticsearch7DynamicSink.java           | 10 ++--
 .../table/Elasticsearch7DynamicSinkFactory.java    |  4 +-
 .../table/Elasticsearch7DynamicSinkTest.java       |  8 ++--
 .../hbase/source/HBaseDynamicTableSource.java      |  4 +-
 .../jdbc/table/JdbcDynamicTableSource.java         |  4 +-
 .../kafka/table/Kafka010DynamicSink.java           |  8 ++--
 .../kafka/table/Kafka010DynamicSource.java         | 10 ++--
 .../kafka/table/Kafka010DynamicTableFactory.java   | 12 ++---
 .../table/Kafka010DynamicTableFactoryTest.java     | 12 ++---
 .../kafka/table/Kafka011DynamicSink.java           |  8 ++--
 .../kafka/table/Kafka011DynamicSource.java         | 10 ++--
 .../kafka/table/Kafka011DynamicTableFactory.java   | 12 ++---
 .../table/Kafka011DynamicTableFactoryTest.java     | 12 ++---
 .../kafka/table/KafkaDynamicSinkBase.java          | 16 +++----
 .../kafka/table/KafkaDynamicSourceBase.java        | 22 ++++-----
 .../kafka/table/KafkaDynamicTableFactoryBase.java  | 20 ++++----
 .../table/KafkaDynamicTableFactoryTestBase.java    | 22 ++++-----
 .../connectors/kafka/table/KafkaDynamicSink.java   |  8 ++--
 .../connectors/kafka/table/KafkaDynamicSource.java | 10 ++--
 .../kafka/table/KafkaDynamicTableFactory.java      | 12 ++---
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 12 ++---
 .../flink/formats/avro/AvroFormatFactory.java      | 22 ++++-----
 .../flink/formats/avro/AvroFormatFactoryTest.java  |  8 ++--
 .../apache/flink/formats/csv/CsvFormatFactory.java | 22 ++++-----
 .../flink/formats/csv/CsvFormatFactoryTest.java    | 12 ++---
 .../flink/formats/json/JsonFormatFactory.java      | 22 ++++-----
 .../formats/json/canal/CanalJsonFormatFactory.java | 15 +++---
 .../json/debezium/DebeziumJsonFormatFactory.java   | 15 +++---
 .../flink/formats/json/JsonFormatFactoryTest.java  |  8 ++--
 .../json/canal/CanalJsonFormatFactoryTest.java     |  4 +-
 .../debezium/DebeziumJsonFormatFactoryTest.java    |  4 +-
 .../table/factories/DataGenTableSourceFactory.java |  2 +-
 .../{ScanFormat.java => DecodingFormat.java}       | 10 ++--
 .../{SinkFormat.java => EncodingFormat.java}       |  8 ++--
 .../flink/table/connector/format/Format.java       |  8 ++--
 .../table/connector/source/DynamicTableSource.java | 38 +++++++++++++++
 .../table/connector/source/LookupTableSource.java  | 27 +++--------
 .../table/connector/source/ScanTableSource.java    | 36 +++------------
 ...rmatFactory.java => DecodingFormatFactory.java} |  9 ++--
 .../factories/DeserializationFormatFactory.java    |  6 +--
 ...rmatFactory.java => EncodingFormatFactory.java} | 10 ++--
 .../apache/flink/table/factories/FactoryUtil.java  | 28 +++++------
 .../factories/SerializationFormatFactory.java      |  6 +--
 .../flink/table/factories/FactoryUtilTest.java     | 24 +++++-----
 .../table/factories/TestDynamicTableFactory.java   | 54 +++++++++++-----------
 .../flink/table/factories/TestFormatFactory.java   | 36 +++++++--------
 .../planner/factories/TestValuesTableFactory.java  |  8 ++--
 .../source/LookupRuntimeProviderContext.java       | 15 +++++-
 .../source/ScanRuntimeProviderContext.java         |  2 +-
 52 files changed, 366 insertions(+), 351 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index eadf659..bedfbef 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
@@ -52,12 +52,12 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
 	@VisibleForTesting
 	static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory();
 
-	private final SinkFormat<SerializationSchema<RowData>> format;
+	private final EncodingFormat<SerializationSchema<RowData>> format;
 	private final TableSchema schema;
 	private final Elasticsearch6Configuration config;
 
 	public Elasticsearch6DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch6Configuration config,
 			TableSchema schema) {
 		this(format, config, schema, (ElasticsearchSink.Builder::new));
@@ -83,7 +83,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
 	}
 
 	Elasticsearch6DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch6Configuration config,
 			TableSchema schema,
 			ElasticSearchBuilderProvider builderProvider) {
@@ -111,7 +111,7 @@ final class Elasticsearch6DynamicSink implements DynamicTableSink {
 	@Override
 	public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
 		return () -> {
-			SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType());
+			SerializationSchema<RowData> format = this.format.createRuntimeEncoder(context, schema.toRowDataType());
 
 			final RowElasticsearchSinkFunction upsertFunction =
 				new RowElasticsearchSinkFunction(
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
index 65c90b5..c5d9c89 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -84,7 +84,7 @@ public class Elasticsearch6DynamicSinkFactory implements DynamicTableSinkFactory
 		ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
 		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+		final EncodingFormat<SerializationSchema<RowData>> format = helper.discoverEncodingFormat(
 			SerializationFormatFactory.class,
 			FORMAT_OPTION);
 
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
index df54147..1708efc 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -65,7 +65,7 @@ public class Elasticsearch6DynamicSinkTest {
 
 		BuilderProvider provider = new BuilderProvider();
 		final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink(
-			new DummySinkFormat(),
+			new DummyEncodingFormat(),
 			new Elasticsearch6Configuration(getConfig(), this.getClass().getClassLoader()),
 			schema,
 			provider
@@ -141,9 +141,9 @@ public class Elasticsearch6DynamicSinkTest {
 		}
 	}
 
-	private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> {
+	private static class DummyEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
 		@Override
-		public SerializationSchema<RowData> createSinkFormat(
+		public SerializationSchema<RowData> createRuntimeEncoder(
 				DynamicTableSink.Context context,
 				DataType consumedDataType) {
 			return DummySerializationSchema.INSTANCE;
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
index 4076b63..408673e 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
 import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
@@ -52,12 +52,12 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
 	@VisibleForTesting
 	static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
 
-	private final SinkFormat<SerializationSchema<RowData>> format;
+	private final EncodingFormat<SerializationSchema<RowData>> format;
 	private final TableSchema schema;
 	private final Elasticsearch7Configuration config;
 
 	public Elasticsearch7DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch7Configuration config,
 			TableSchema schema) {
 		this(format, config, schema, (ElasticsearchSink.Builder::new));
@@ -83,7 +83,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
 	}
 
 	Elasticsearch7DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch7Configuration config,
 			TableSchema schema,
 			ElasticSearchBuilderProvider builderProvider) {
@@ -111,7 +111,7 @@ final class Elasticsearch7DynamicSink implements DynamicTableSink {
 	@Override
 	public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
 		return () -> {
-			SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType());
+			SerializationSchema<RowData> format = this.format.createRuntimeEncoder(context, schema.toRowDataType());
 
 			final RowElasticsearchSinkFunction upsertFunction =
 				new RowElasticsearchSinkFunction(
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
index 055989b..ae7a9fd 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -83,7 +83,7 @@ public class Elasticsearch7DynamicSinkFactory implements DynamicTableSinkFactory
 
 		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+		final EncodingFormat<SerializationSchema<RowData>> format = helper.discoverEncodingFormat(
 			SerializationFormatFactory.class,
 			FORMAT_OPTION);
 
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
index 466ede3..c972cee 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -65,7 +65,7 @@ public class Elasticsearch7DynamicSinkTest {
 
 		BuilderProvider provider = new BuilderProvider();
 		final Elasticsearch7DynamicSink testSink = new Elasticsearch7DynamicSink(
-			new DummySinkFormat(),
+			new DummyEncodingFormat(),
 			new Elasticsearch7Configuration(getConfig(), this.getClass().getClassLoader()),
 			schema,
 			provider
@@ -141,9 +141,9 @@ public class Elasticsearch7DynamicSinkTest {
 		}
 	}
 
-	private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> {
+	private static class DummyEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
 		@Override
-		public SerializationSchema<RowData> createSinkFormat(
+		public SerializationSchema<RowData> createRuntimeEncoder(
 				DynamicTableSink.Context context,
 				DataType consumedDataType) {
 			return DummySerializationSchema.INSTANCE;
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index dcc5a5b..1dac67ae 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -58,12 +58,12 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour
 	}
 
 	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 		return InputFormatProvider.of(new HBaseRowDataInputFormat(conf, tableName, hbaseSchema, nullStringLiteral));
 	}
 
 	@Override
-	public LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.Context context) {
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
 		checkArgument(context.getKeys().length == 1 && context.getKeys()[0].length == 1,
 			"Currently, HBase table can only be lookup by single rowkey.");
 		checkArgument(
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 21a80a2..5637ac4 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -65,7 +65,7 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 	}
 
 	@Override
-	public LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.Context context) {
+	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
 		// JDBC only support non-nested look up keys
 		String[] keyNames = new String[context.getKeys().length];
 		for (int i = 0; i < keyNames.length; i++) {
@@ -87,7 +87,7 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 		final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
 			.setDrivername(options.getDriverName())
 			.setDBUrl(options.getDbURL())
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
index 0801065..9972794 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSink.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -42,13 +42,13 @@ public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		super(
 			consumedDataType,
 			topic,
 			properties,
 			partitioner,
-			sinkFormat);
+			encodingFormat);
 	}
 
 	@Override
@@ -71,7 +71,7 @@ public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
 				this.topic,
 				this.properties,
 				this.partitioner,
-				this.sinkFormat);
+				this.encodingFormat);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
index 3747d01..61e586a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicSource.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -45,7 +45,7 @@ public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
 	 * @param outputDataType         Source output data type
 	 * @param topic                  Kafka topic to consume
 	 * @param properties             Properties for the Kafka consumer
-	 * @param scanFormat             Scan format for decoding records from Kafka
+	 * @param decodingFormat         Decoding format for decoding records from Kafka
 	 * @param startupMode            Startup mode for the contained consumer
 	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
 	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}
@@ -56,7 +56,7 @@ public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
 			DataType outputDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
@@ -65,7 +65,7 @@ public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
 			outputDataType,
 			topic,
 			properties,
-			scanFormat,
+			decodingFormat,
 			startupMode,
 			specificStartupOffsets,
 			startupTimestampMillis);
@@ -85,7 +85,7 @@ public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
 				this.outputDataType,
 				this.topic,
 				this.properties,
-				this.scanFormat,
+				this.decodingFormat,
 				this.startupMode,
 				this.specificStartupOffsets,
 				this.startupTimestampMillis);
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
index bdf30d6..b5737c4 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
@@ -44,7 +44,7 @@ public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
@@ -53,7 +53,7 @@ public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			producedDataType,
 			topic,
 			properties,
-			scanFormat,
+			decodingFormat,
 			startupMode,
 			specificStartupOffsets,
 			startupTimestampMillis);
@@ -65,14 +65,14 @@ public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 
 		return new Kafka010DynamicSink(
 			consumedDataType,
 			topic,
 			properties,
 			partitioner,
-			sinkFormat);
+			encodingFormat);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
index 320e239..12c892b 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactoryTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactor
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
@@ -62,7 +62,7 @@ public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestamp) {
@@ -70,7 +70,7 @@ public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 				producedDataType,
 				topic,
 				properties,
-				scanFormat,
+				decodingFormat,
 				startupMode,
 				specificStartupOffsets,
 				startupTimestamp);
@@ -82,12 +82,12 @@ public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		return new Kafka010DynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				sinkFormat);
+				encodingFormat);
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
index 55f679e..110dd9a 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
 import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -43,13 +43,13 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		super(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				sinkFormat);
+				encodingFormat);
 	}
 
 	@Override
@@ -72,7 +72,7 @@ public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
 				this.topic,
 				this.properties,
 				this.partitioner,
-				this.sinkFormat);
+				this.encodingFormat);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSource.java
index 661dde8..32b0b63 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSource.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -45,7 +45,7 @@ public class Kafka011DynamicSource extends KafkaDynamicSourceBase {
 	 * @param outputDataType         Source output data type
 	 * @param topic                  Kafka topic to consume
 	 * @param properties             Properties for the Kafka consumer
-	 * @param scanFormat             Scan format for decoding records from Kafka
+	 * @param decodingFormat         Decoding format for decoding records from Kafka
 	 * @param startupMode            Startup mode for the contained consumer
 	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
 	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}
@@ -56,7 +56,7 @@ public class Kafka011DynamicSource extends KafkaDynamicSourceBase {
 			DataType outputDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
@@ -65,7 +65,7 @@ public class Kafka011DynamicSource extends KafkaDynamicSourceBase {
 				outputDataType,
 				topic,
 				properties,
-				scanFormat,
+				decodingFormat,
 				startupMode,
 				specificStartupOffsets,
 				startupTimestampMillis);
@@ -85,7 +85,7 @@ public class Kafka011DynamicSource extends KafkaDynamicSourceBase {
 				this.outputDataType,
 				this.topic,
 				this.properties,
-				this.scanFormat,
+				this.decodingFormat,
 				this.startupMode,
 				this.specificStartupOffsets,
 				this.startupTimestampMillis);
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
index 3a02cfc..d928fe6 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactory.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
@@ -43,7 +43,7 @@ public class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
@@ -51,7 +51,7 @@ public class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 				producedDataType,
 				topic,
 				properties,
-				scanFormat,
+				decodingFormat,
 				startupMode,
 				specificStartupOffsets,
 				startupTimestampMillis);
@@ -63,13 +63,13 @@ public class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		return new Kafka011DynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				sinkFormat);
+				encodingFormat);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
index 1c0e223..37ecf15 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactor
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
@@ -62,7 +62,7 @@ public class Kafka011DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestamp) {
@@ -70,7 +70,7 @@ public class Kafka011DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 				producedDataType,
 				topic,
 				properties,
-				scanFormat,
+				decodingFormat,
 				startupMode,
 				specificStartupOffsets,
 				startupTimestamp);
@@ -82,12 +82,12 @@ public class Kafka011DynamicTableFactoryTest extends KafkaDynamicTableFactoryTes
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		return new Kafka011DynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				sinkFormat);
+				encodingFormat);
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
index c90fc2e..44058d2 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
@@ -53,7 +53,7 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 	protected final Properties properties;
 
 	/** Sink format for encoding records to Kafka. */
-	protected final SinkFormat<SerializationSchema<RowData>> sinkFormat;
+	protected final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
 
 	/** Partitioner to select Kafka partition for each item. */
 	protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
@@ -63,23 +63,23 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null.");
 		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
 		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
 		this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
-		this.sinkFormat = Preconditions.checkNotNull(sinkFormat, "Sink format must not be null.");
+		this.encodingFormat = Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null.");
 	}
 
 	@Override
 	public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-		return this.sinkFormat.getChangelogMode();
+		return this.encodingFormat.getChangelogMode();
 	}
 
 	@Override
 	public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
 		SerializationSchema<RowData> serializationSchema =
-				this.sinkFormat.createSinkFormat(context, this.consumedDataType);
+				this.encodingFormat.createRuntimeEncoder(context, this.consumedDataType);
 
 		final SinkFunction<RowData> kafkaProducer = createKafkaProducer(
 				this.topic,
@@ -117,7 +117,7 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 		return Objects.equals(consumedDataType, that.consumedDataType) &&
 			Objects.equals(topic, that.topic) &&
 			Objects.equals(properties, that.properties) &&
-			Objects.equals(sinkFormat, that.sinkFormat) &&
+			Objects.equals(encodingFormat, that.encodingFormat) &&
 			Objects.equals(partitioner, that.partitioner);
 	}
 
@@ -127,7 +127,7 @@ public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
 			consumedDataType,
 			topic,
 			properties,
-			sinkFormat,
+			encodingFormat,
 			partitioner);
 	}
 }
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
index 0c97715..519cc8c 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.data.RowData;
@@ -55,7 +55,7 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 	// --------------------------------------------------------------------------------------------
 
 	/** Scan format for decoding records from Kafka. */
-	protected final ScanFormat<DeserializationSchema<RowData>> scanFormat;
+	protected final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
 
 	// --------------------------------------------------------------------------------------------
 	// Kafka-specific attributes
@@ -85,7 +85,7 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 	 * @param outputDataType         Source produced data type
 	 * @param topic                  Kafka topic to consume.
 	 * @param properties             Properties for the Kafka consumer.
-	 * @param scanFormat             Scan format for decoding records from Kafka.
+	 * @param decodingFormat         Decoding format for decoding records from Kafka.
 	 * @param startupMode            Startup mode for the contained consumer.
 	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
 	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}.
@@ -96,7 +96,7 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 			DataType outputDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
@@ -104,8 +104,8 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 				outputDataType, "Produced data type must not be null.");
 		this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
 		this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
-		this.scanFormat = Preconditions.checkNotNull(
-				scanFormat, "Scan format must not be null.");
+		this.decodingFormat = Preconditions.checkNotNull(
+			decodingFormat, "Decoding format must not be null.");
 		this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
 		this.specificStartupOffsets = Preconditions.checkNotNull(
 			specificStartupOffsets, "Specific offsets must not be null.");
@@ -114,13 +114,13 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 
 	@Override
 	public ChangelogMode getChangelogMode() {
-		return this.scanFormat.getChangelogMode();
+		return this.decodingFormat.getChangelogMode();
 	}
 
 	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 		DeserializationSchema<RowData> deserializationSchema =
-				this.scanFormat.createScanFormat(runtimeProviderContext, this.outputDataType);
+				this.decodingFormat.createRuntimeDecoder(runtimeProviderContext, this.outputDataType);
 		// Version-specific Kafka consumer
 		FlinkKafkaConsumerBase<RowData> kafkaConsumer =
 				getKafkaConsumer(topic, properties, deserializationSchema);
@@ -139,7 +139,7 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 		return Objects.equals(outputDataType, that.outputDataType) &&
 			Objects.equals(topic, that.topic) &&
 			Objects.equals(properties, that.properties) &&
-			Objects.equals(scanFormat, that.scanFormat) &&
+			Objects.equals(decodingFormat, that.decodingFormat) &&
 			startupMode == that.startupMode &&
 			Objects.equals(specificStartupOffsets, that.specificStartupOffsets) &&
 			startupTimestampMillis == that.startupTimestampMillis;
@@ -151,7 +151,7 @@ public abstract class KafkaDynamicSourceBase implements ScanTableSource {
 			outputDataType,
 			topic,
 			properties,
-			scanFormat,
+			decodingFormat,
 			startupMode,
 			specificStartupOffsets,
 			startupTimestampMillis);
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
index 72a5bfb..1730809 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
@@ -25,8 +25,8 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
@@ -70,7 +70,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 		ReadableConfig tableOptions = helper.getOptions();
 
 		String topic = tableOptions.get(TOPIC);
-		ScanFormat<DeserializationSchema<RowData>> scanFormat = helper.discoverScanFormat(
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
 				DeserializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
@@ -84,7 +84,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 				producedDataType,
 				topic,
 				getKafkaProperties(context.getCatalogTable().getOptions()),
-				scanFormat,
+				decodingFormat,
 				startupOptions.startupMode,
 				startupOptions.specificOffsets,
 				startupOptions.startupTimestampMillis);
@@ -97,7 +97,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 		ReadableConfig tableOptions = helper.getOptions();
 
 		String topic = tableOptions.get(TOPIC);
-		SinkFormat<SerializationSchema<RowData>> sinkFormat = helper.discoverSinkFormat(
+		EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
 				SerializationFormatFactory.class,
 				FactoryUtil.FORMAT);
 		// Validate the option data type.
@@ -111,7 +111,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 				topic,
 				getKafkaProperties(context.getCatalogTable().getOptions()),
 				getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()),
-				sinkFormat);
+				encodingFormat);
 	}
 
 	/**
@@ -120,7 +120,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 	 * @param producedDataType       Source produced data type
 	 * @param topic                  Kafka topic to consume
 	 * @param properties             Properties for the Kafka consumer
-	 * @param scanFormat             Scan format for decoding records from Kafka
+	 * @param decodingFormat         Decoding format for decoding records from Kafka
 	 * @param startupMode            Startup mode for the contained consumer
 	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
 	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}
@@ -129,7 +129,7 @@ public abstract class KafkaDynamicTableFactoryBase implements
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis);
@@ -141,14 +141,14 @@ public abstract class KafkaDynamicTableFactoryBase implements
 	 * @param topic            Kafka topic to consume
 	 * @param properties       Properties for the Kafka consumer
 	 * @param partitioner      Partitioner to select Kafka partition for each item
-	 * @param sinkFormat       Sink format for encoding records to Kafka
+	 * @param encodingFormat   Encoding format for encoding records to Kafka
 	 */
 	protected abstract KafkaDynamicSinkBase createKafkaTableSink(
 			DataType consumedDataType,
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat);
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat);
 
 	@Override
 	public Set<ConfigOption<?>> requiredOptions() {
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
index 2080c2f..09372b1 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
@@ -34,8 +34,8 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -122,8 +122,8 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
 		specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
 
-		ScanFormat<DeserializationSchema<RowData>> scanFormat =
-				new TestFormatFactory.ScanFormatMock(",", true);
+		DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+				new TestFormatFactory.DecodingFormatMock(",", true);
 
 		// Construct table source using options and table source factory
 		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
@@ -142,7 +142,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 				producedDataType,
 				TOPIC,
 				KAFKA_PROPERTIES,
-				scanFormat,
+				decodingFormat,
 				StartupMode.SPECIFIC_OFFSETS,
 				specificOffsets,
 				0);
@@ -189,8 +189,8 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 	@Test
 	public void testTableSink() {
 		final DataType consumedDataType = SINK_SCHEMA.toPhysicalRowDataType();
-		SinkFormat<SerializationSchema<RowData>> sinkFormat =
-				new TestFormatFactory.SinkFormatMock(",");
+		EncodingFormat<SerializationSchema<RowData>> encodingFormat =
+				new TestFormatFactory.EncodingFormatMock(",");
 
 		// Construct table sink using options and table sink factory.
 		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
@@ -210,12 +210,12 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 				TOPIC,
 				KAFKA_PROPERTIES,
 				Optional.of(new FlinkFixedPartitioner<>()),
-				sinkFormat);
+				encodingFormat);
 		assertEquals(expectedSink, actualSink);
 
 		// Test sink format.
 		final KafkaDynamicSinkBase actualKafkaSink = (KafkaDynamicSinkBase) actualSink;
-		assertEquals(sinkFormat, actualKafkaSink.sinkFormat);
+		assertEquals(encodingFormat, actualKafkaSink.encodingFormat);
 
 		// Test kafka producer.
 		DynamicTableSink.SinkRuntimeProvider provider =
@@ -408,7 +408,7 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestamp
@@ -419,6 +419,6 @@ public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat
 	);
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index 3a41b69..f3d6c1e 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -42,13 +42,13 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		super(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				sinkFormat);
+				encodingFormat);
 	}
 
 	@Override
@@ -71,7 +71,7 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
 				this.topic,
 				this.properties,
 				this.partitioner,
-				this.sinkFormat);
+				this.encodingFormat);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index af2730f..a292fae 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.sources.StreamTableSource;
@@ -44,7 +44,7 @@ public class KafkaDynamicSource extends KafkaDynamicSourceBase {
 	 * @param outputDataType         Source output data type
 	 * @param topic                  Kafka topic to consume
 	 * @param properties             Properties for the Kafka consumer
-	 * @param scanFormat             Scan format for decoding records from Kafka
+	 * @param decodingFormat         Decoding format for decoding records from Kafka
 	 * @param startupMode            Startup mode for the contained consumer
 	 * @param specificStartupOffsets Specific startup offsets; only relevant when startup
 	 *                               mode is {@link StartupMode#SPECIFIC_OFFSETS}
@@ -53,7 +53,7 @@ public class KafkaDynamicSource extends KafkaDynamicSourceBase {
 			DataType outputDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
@@ -62,7 +62,7 @@ public class KafkaDynamicSource extends KafkaDynamicSourceBase {
 			outputDataType,
 			topic,
 			properties,
-			scanFormat,
+			decodingFormat,
 			startupMode,
 			specificStartupOffsets,
 			startupTimestampMillis);
@@ -82,7 +82,7 @@ public class KafkaDynamicSource extends KafkaDynamicSourceBase {
 				this.outputDataType,
 				this.topic,
 				this.properties,
-				this.scanFormat,
+				this.decodingFormat,
 				this.startupMode,
 				this.specificStartupOffsets,
 				this.startupTimestampMillis);
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 063ebe5..7242238 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
@@ -42,7 +42,7 @@ public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
@@ -50,7 +50,7 @@ public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase {
 				producedDataType,
 				topic,
 				properties,
-				scanFormat,
+				decodingFormat,
 				startupMode,
 				specificStartupOffsets,
 				startupTimestampMillis);
@@ -62,13 +62,13 @@ public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase {
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		return new KafkaDynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				sinkFormat);
+				encodingFormat);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 8698e2e..785578a 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -27,8 +27,8 @@ import org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
@@ -61,7 +61,7 @@ public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBa
 			DataType producedDataType,
 			String topic,
 			Properties properties,
-			ScanFormat<DeserializationSchema<RowData>> scanFormat,
+			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestamp) {
@@ -69,7 +69,7 @@ public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBa
 				producedDataType,
 				topic,
 				properties,
-				scanFormat,
+				decodingFormat,
 				startupMode,
 				specificStartupOffsets,
 				startupTimestamp);
@@ -81,12 +81,12 @@ public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBa
 			String topic,
 			Properties properties,
 			Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-			SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+			EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
 		return new KafkaDynamicSink(
 				consumedDataType,
 				topic,
 				properties,
 				partitioner,
-				sinkFormat);
+				encodingFormat);
 	}
 }
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
index dbb2d7e..d07d17f 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
@@ -24,10 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
@@ -50,19 +50,19 @@ public class AvroFormatFactory implements
 	public static final String IDENTIFIER = "avro";
 
 	@Override
-	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
 
-		return new ScanFormat<DeserializationSchema<RowData>>() {
+		return new DecodingFormat<DeserializationSchema<RowData>>() {
 			@Override
-			public DeserializationSchema<RowData> createScanFormat(
-					ScanTableSource.Context scanContext,
+			public DeserializationSchema<RowData> createRuntimeDecoder(
+					DynamicTableSource.Context context,
 					DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-						(TypeInformation<RowData>) scanContext.createTypeInformation(producedDataType);
+						(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
 				return new AvroRowDataDeserializationSchema(rowType, rowDataTypeInfo);
 			}
 
@@ -74,14 +74,14 @@ public class AvroFormatFactory implements
 	}
 
 	@Override
-	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+	public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
 
-		return new SinkFormat<SerializationSchema<RowData>>() {
+		return new EncodingFormat<SerializationSchema<RowData>>() {
 			@Override
-			public SerializationSchema<RowData> createSinkFormat(
+			public SerializationSchema<RowData> createRuntimeEncoder(
 					DynamicTableSink.Context context,
 					DataType consumedDataType) {
 				final RowType rowType = (RowType) consumedDataType.getLogicalType();
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
index d1c1156..0df0ea7 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
@@ -67,8 +67,8 @@ public class AvroFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
 				(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
-		DeserializationSchema<RowData> actualDeser = scanSourceMock.sourceValueFormat
-				.createScanFormat(
+		DeserializationSchema<RowData> actualDeser = scanSourceMock.valueFormat
+				.createRuntimeDecoder(
 						ScanRuntimeProviderContext.INSTANCE,
 						SCHEMA.toRowDataType());
 
@@ -82,8 +82,8 @@ public class AvroFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
 				(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
 
-		SerializationSchema<RowData> actualSer = sinkMock.sinkValueFormat
-				.createSinkFormat(
+		SerializationSchema<RowData> actualSer = sinkMock.valueFormat
+				.createRuntimeEncoder(
 						null,
 						SCHEMA.toRowDataType());
 
diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
index 0ebf0b9..9746ae7 100644
--- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
+++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java
@@ -25,10 +25,10 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
@@ -64,19 +64,19 @@ public final class CsvFormatFactory implements
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context, ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
 		validateFormatOptions(formatOptions);
 
-		return new ScanFormat<DeserializationSchema<RowData>>() {
+		return new DecodingFormat<DeserializationSchema<RowData>>() {
 			@Override
-			public DeserializationSchema<RowData> createScanFormat(
-					ScanTableSource.Context scanContext,
+			public DeserializationSchema<RowData> createRuntimeDecoder(
+					DynamicTableSource.Context context,
 					DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-						(TypeInformation<RowData>) scanContext.createTypeInformation(producedDataType);
+						(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
 				final CsvRowDataDeserializationSchema.Builder schemaBuilder =
 						new CsvRowDataDeserializationSchema.Builder(
 								rowType,
@@ -93,15 +93,15 @@ public final class CsvFormatFactory implements
 	}
 
 	@Override
-	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+	public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
 		validateFormatOptions(formatOptions);
 
-		return new SinkFormat<SerializationSchema<RowData>>() {
+		return new EncodingFormat<SerializationSchema<RowData>>() {
 			@Override
-			public SerializationSchema<RowData> createSinkFormat(
+			public SerializationSchema<RowData> createRuntimeEncoder(
 					DynamicTableSink.Context context,
 					DataType consumedDataType) {
 				final RowType rowType = (RowType) consumedDataType.getLogicalType();
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
index eb10b9c..78cdd42 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java
@@ -82,8 +82,8 @@ public class CsvFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
 				(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
-		DeserializationSchema<RowData> actualDeser = scanSourceMock.sourceValueFormat
-				.createScanFormat(
+		DeserializationSchema<RowData> actualDeser = scanSourceMock.valueFormat
+				.createRuntimeDecoder(
 						ScanRuntimeProviderContext.INSTANCE,
 						SCHEMA.toRowDataType());
 
@@ -103,8 +103,8 @@ public class CsvFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
 				(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
 
-		SerializationSchema<RowData> actualSer = sinkMock.sinkValueFormat
-				.createSinkFormat(
+		SerializationSchema<RowData> actualSer = sinkMock.valueFormat
+				.createRuntimeEncoder(
 						null,
 						SCHEMA.toRowDataType());
 
@@ -132,8 +132,8 @@ public class CsvFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
 				(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
 
-		SerializationSchema<RowData> actualSer = sinkMock.sinkValueFormat
-				.createSinkFormat(
+		SerializationSchema<RowData> actualSer = sinkMock.valueFormat
+				.createRuntimeEncoder(
 						null,
 						SCHEMA.toRowDataType());
 
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
index ca80159..ba98867 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
@@ -25,10 +25,10 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
@@ -56,7 +56,7 @@ public class JsonFormatFactory implements
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
@@ -65,14 +65,14 @@ public class JsonFormatFactory implements
 		final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
 		final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
 
-		return new ScanFormat<DeserializationSchema<RowData>>() {
+		return new DecodingFormat<DeserializationSchema<RowData>>() {
 			@Override
-			public DeserializationSchema<RowData> createScanFormat(
-					ScanTableSource.Context scanContext,
+			public DeserializationSchema<RowData> createRuntimeDecoder(
+					DynamicTableSource.Context context,
 					DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
-						(TypeInformation<RowData>) scanContext.createTypeInformation(producedDataType);
+						(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
 				return new JsonRowDataDeserializationSchema(
 						rowType,
 						rowDataTypeInfo,
@@ -88,14 +88,14 @@ public class JsonFormatFactory implements
 	}
 
 	@Override
-	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+	public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
 
-		return new SinkFormat<SerializationSchema<RowData>>() {
+		return new EncodingFormat<SerializationSchema<RowData>>() {
 			@Override
-			public SerializationSchema<RowData> createSinkFormat(
+			public SerializationSchema<RowData> createRuntimeEncoder(
 					DynamicTableSink.Context context,
 					DataType consumedDataType) {
 				final RowType rowType = (RowType) consumedDataType.getLogicalType();
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
index 1170953..fba98bf 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java
@@ -25,9 +25,9 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
@@ -57,15 +57,16 @@ public class CanalJsonFormatFactory implements DeserializationFormatFactory, Ser
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
 		final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
 
-		return new ScanFormat<DeserializationSchema<RowData>>() {
+		return new DecodingFormat<DeserializationSchema<RowData>>() {
 			@Override
-			public DeserializationSchema<RowData> createScanFormat(ScanTableSource.Context context, DataType producedDataType) {
+			public DeserializationSchema<RowData> createRuntimeDecoder(
+					DynamicTableSource.Context context, DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
 					(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
@@ -88,7 +89,7 @@ public class CanalJsonFormatFactory implements DeserializationFormatFactory, Ser
 	}
 
 	@Override
-	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+	public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		throw new UnsupportedOperationException("Canal format doesn't support as a sink format yet.");
diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
index e3bb0a4..3458014 100644
--- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
+++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java
@@ -25,9 +25,9 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
@@ -66,16 +66,17 @@ public class DebeziumJsonFormatFactory implements DeserializationFormatFactory,
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		FactoryUtil.validateFactoryOptions(this, formatOptions);
 		final boolean schemaInclude = formatOptions.get(SCHEMA_INCLUDE);
 		final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
 
-		return new ScanFormat<DeserializationSchema<RowData>>() {
+		return new DecodingFormat<DeserializationSchema<RowData>>() {
 			@Override
-			public DeserializationSchema<RowData> createScanFormat(ScanTableSource.Context context, DataType producedDataType) {
+			public DeserializationSchema<RowData> createRuntimeDecoder(
+					DynamicTableSource.Context context, DataType producedDataType) {
 				final RowType rowType = (RowType) producedDataType.getLogicalType();
 				final TypeInformation<RowData> rowDataTypeInfo =
 					(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
@@ -99,7 +100,7 @@ public class DebeziumJsonFormatFactory implements DeserializationFormatFactory,
 	}
 
 	@Override
-	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+	public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatOptions) {
 		throw new UnsupportedOperationException("Debezium format doesn't support as a sink format yet.");
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
index d54daa6..8ddf03e 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java
@@ -108,8 +108,8 @@ public class JsonFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
 				(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
-		DeserializationSchema<RowData> actualDeser = scanSourceMock.sourceValueFormat
-				.createScanFormat(
+		DeserializationSchema<RowData> actualDeser = scanSourceMock.valueFormat
+				.createRuntimeDecoder(
 						ScanRuntimeProviderContext.INSTANCE,
 						SCHEMA.toRowDataType());
 
@@ -124,8 +124,8 @@ public class JsonFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
 				(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
 
-		SerializationSchema<RowData> actualSer = sinkMock.sinkValueFormat
-				.createSinkFormat(
+		SerializationSchema<RowData> actualSer = sinkMock.valueFormat
+				.createRuntimeEncoder(
 						new SinkRuntimeProviderContext(false),
 						SCHEMA.toRowDataType());
 
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
index e675873..83afa11 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java
@@ -74,8 +74,8 @@ public class CanalJsonFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
 				(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
-		DeserializationSchema<RowData> actualDeser = scanSourceMock.sourceValueFormat
-				.createScanFormat(
+		DeserializationSchema<RowData> actualDeser = scanSourceMock.valueFormat
+				.createRuntimeDecoder(
 						ScanRuntimeProviderContext.INSTANCE,
 						SCHEMA.toRowDataType());
 
diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
index 8eb6d9b..869ff28 100644
--- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
+++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java
@@ -75,8 +75,8 @@ public class DebeziumJsonFormatFactoryTest extends TestLogger {
 		TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
 				(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
 
-		DeserializationSchema<RowData> actualDeser = scanSourceMock.sourceValueFormat
-				.createScanFormat(
+		DeserializationSchema<RowData> actualDeser = scanSourceMock.valueFormat
+				.createRuntimeDecoder(
 						ScanRuntimeProviderContext.INSTANCE,
 						SCHEMA.toRowDataType());
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
index 9a07f8f..27a517b 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java
@@ -241,7 +241,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory {
 		}
 
 		@Override
-		public ScanRuntimeProvider getScanRuntimeProvider(Context context) {
+		public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
 			return SourceFunctionProvider.of(createSource(), false);
 		}
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
similarity index 73%
rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
index 529ea37..f433026 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ScanFormat.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
@@ -19,19 +19,19 @@
 package org.apache.flink.table.connector.format;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.types.DataType;
 
 /**
- * A {@link Format} for a {@link ScanTableSource}.
+ * A {@link Format} for a {@link DynamicTableSource} for reading rows.
  *
  * @param <I> runtime interface needed by the table source
  */
 @PublicEvolving
-public interface ScanFormat<I> extends Format {
+public interface DecodingFormat<I> extends Format {
 
 	/**
-	 * Creates runtime implementation that is configured to produce data of the given data type.
+	 * Creates runtime decoder implementation that is configured to produce data of the given data type.
 	 */
-	I createScanFormat(ScanTableSource.Context context, DataType producedDataType);
+	I createRuntimeDecoder(DynamicTableSource.Context context, DataType producedDataType);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/EncodingFormat.java
similarity index 78%
rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/EncodingFormat.java
index 82589f4..c7eea56 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/SinkFormat.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/EncodingFormat.java
@@ -23,15 +23,15 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.types.DataType;
 
 /**
- * A {@link Format} for a {@link DynamicTableSink}.
+ * A {@link Format} for a {@link DynamicTableSink} for writing rows.
  *
  * @param <I> runtime interface needed by the table sink
  */
 @PublicEvolving
-public interface SinkFormat<I> extends Format {
+public interface EncodingFormat<I> extends Format {
 
 	/**
-	 * Creates runtime implementation that is configured to consume data of the given data type.
+	 * Creates runtime encoder implementation that is configured to consume data of the given data type.
 	 */
-	I createSinkFormat(DynamicTableSink.Context context, DataType consumedDataType);
+	I createRuntimeEncoder(DynamicTableSink.Context context, DataType consumedDataType);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
index 40c0115..9177316 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableFactory;
 
 /**
@@ -34,15 +34,15 @@ import org.apache.flink.table.factories.DynamicTableFactory;
  *
  * <p>Formats can be distinguished along two dimensions:
  * <ul>
- *     <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Context in which the format is applied ({@link DynamicTableSource} or {@link DynamicTableSink}).
  *     <li>Runtime implementation interface that is required (e.g. {@link DeserializationSchema} or
  *     some bulk interface).</li>
  * </ul>
  *
  * <p>A {@link DynamicTableFactory} can search for a format that it is accepted by the connector.
  *
- * @see ScanFormat
- * @see SinkFormat
+ * @see DecodingFormat
+ * @see EncodingFormat
  */
 @PublicEvolving
 public interface Format {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
index 943d28b..1e31630 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java
@@ -19,15 +19,20 @@
 package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.RuntimeConverter;
 import org.apache.flink.table.connector.source.abilities.SupportsComputedColumnPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.Row;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
+
 /**
  * Source of a dynamic table from an external storage system.
  *
@@ -72,6 +77,39 @@ public interface DynamicTableSource {
 	// --------------------------------------------------------------------------------------------
 
 	/**
+	 * Base context for creating runtime implementation via a {@link ScanTableSource.ScanRuntimeProvider}
+	 * and {@link LookupTableSource.LookupRuntimeProvider}.
+	 *
+	 * <p>It offers utilities by the planner for creating runtime implementation with minimal dependencies
+	 * to internal data structures.
+	 *
+	 * <p>Methods should be called in {@link ScanTableSource#getScanRuntimeProvider(ScanTableSource.ScanContext)}
+	 * and {@link LookupTableSource#getLookupRuntimeProvider(LookupTableSource.LookupContext)}. The returned
+	 * instances are {@link Serializable} and can be directly passed into the runtime implementation class.
+	 */
+	interface Context {
+
+		/**
+		 * Creates type information describing the internal data structures of the given {@link DataType}.
+		 *
+		 * @see TableSchema#toPhysicalRowDataType()
+		 */
+		TypeInformation<?> createTypeInformation(DataType producedDataType);
+
+		/**
+		 * Creates a converter for mapping between objects specified by the given {@link DataType} and
+		 * Flink's internal data structures that can be passed into a runtime implementation.
+		 *
+		 * <p>For example, a {@link Row} and its fields can be converted into {@link RowData}, or a (possibly
+		 * nested) POJO can be converted into the internal representation for structured types.
+		 *
+		 * @see LogicalType#supportsInputConversion(Class)
+		 * @see TableSchema#toPhysicalRowDataType()
+		 */
+		DataStructureConverter createDataStructureConverter(DataType producedDataType);
+	}
+
+	/**
 	 * Converter for mapping between objects and Flink's internal data structures during runtime.
 	 *
 	 * <p>On request, the planner will provide a specialized (possibly code generated) converter that
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java
index fa61d99..363873e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java
@@ -19,10 +19,6 @@
 package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.Experimental;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
 import java.io.Serializable;
@@ -37,9 +33,9 @@ import java.io.Serializable;
  * <p>Note: Compared to {@link ScanTableSource}, a {@link LookupTableSource} does only support emitting
  * insert-only changes currently (see also {@link RowKind}). Further abilities are not supported.
  *
- * <p>In the last step, the planner will call {@link #getLookupRuntimeProvider(Context)} for obtaining a
+ * <p>In the last step, the planner will call {@link #getLookupRuntimeProvider(LookupContext)} for obtaining a
  * provider of runtime implementation. The key fields that are required to perform a lookup are derived
- * from a query by the planner and will be provided in the given {@link Context#getKeys()}. The values
+ * from a query by the planner and will be provided in the given {@link LookupContext#getKeys()}. The values
  * for those key fields are passed during runtime.
  */
 @Experimental
@@ -54,13 +50,13 @@ public interface LookupTableSource extends DynamicTableSource {
 	 * <p>Independent of the provider interface, a source implementation can work on either arbitrary
 	 * objects or internal data structures (see {@link org.apache.flink.table.data} for more information).
 	 *
-	 * <p>The given {@link Context} offers utilities by the planner for creating runtime implementation
+	 * <p>The given {@link LookupContext} offers utilities by the planner for creating runtime implementation
 	 * with minimal dependencies to internal data structures.
 	 *
 	 * @see TableFunctionProvider
 	 * @see AsyncTableFunctionProvider
 	 */
-	LookupRuntimeProvider getLookupRuntimeProvider(Context context);
+	LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
 
 	// --------------------------------------------------------------------------------------------
 	// Helper interfaces
@@ -72,10 +68,10 @@ public interface LookupTableSource extends DynamicTableSource {
 	 * <p>It offers utilities by the planner for creating runtime implementation with minimal dependencies
 	 * to internal data structures.
 	 *
-	 * <p>Methods should be called in {@link #getLookupRuntimeProvider(Context)}. Returned instances
+	 * <p>Methods should be called in {@link #getLookupRuntimeProvider(LookupContext)}. Returned instances
 	 * that are {@link Serializable} can be directly passed into the runtime implementation class.
 	 */
-	interface Context {
+	interface LookupContext extends DynamicTableSource.Context {
 
 		/**
 		 * Returns an array of key index paths that should be used during the lookup. The indices are
@@ -88,17 +84,6 @@ public interface LookupTableSource extends DynamicTableSource {
 		 * @return array of key index paths
 		 */
 		int[][] getKeys();
-
-		/**
-		 * Creates a converter for mapping between objects specified by the given {@link DataType} and
-		 * Flink's internal data structures that can be passed into a runtime implementation.
-		 *
-		 * <p>For example, a {@link Row} and its fields can be converted into {@link RowData} or a (possibly
-		 * nested) POJO can be converted into the internal representation for structured types.
-		 *
-		 * @see LogicalType#supportsInputConversion(Class)
-		 */
-		DataStructureConverter createDataStructureConverter(DataType producedDataType);
 	}
 
 	/**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java
index 661ebdf..8c6947a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/ScanTableSource.java
@@ -19,18 +19,12 @@
 package org.apache.flink.table.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.abilities.SupportsComputedColumnPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
 import java.io.Serializable;
@@ -59,7 +53,7 @@ import java.io.Serializable;
  *     <li>{@link SupportsPartitionPushDown}
  * </ul>
  *
- * <p>In the last step, the planner will call {@link #getScanRuntimeProvider(Context)} for obtaining a
+ * <p>In the last step, the planner will call {@link #getScanRuntimeProvider(ScanContext)} for obtaining a
  * provider of runtime implementation.
  */
 @PublicEvolving
@@ -82,12 +76,12 @@ public interface ScanTableSource extends DynamicTableSource {
 	 * <p>Independent of the provider interface, the table runtime expects that a source implementation
 	 * emits internal data structures (see {@link org.apache.flink.table.data.RowData} for more information).
 	 *
-	 * <p>The given {@link Context} offers utilities by the planner for creating runtime implementation
+	 * <p>The given {@link ScanContext} offers utilities by the planner for creating runtime implementation
 	 * with minimal dependencies to internal data structures.
 	 *
 	 * <p>See {@code org.apache.flink.table.connector.source.SourceFunctionProvider} in {@code flink-table-api-java-bridge}.
 	 */
-	ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext);
+	ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext);
 
 	// --------------------------------------------------------------------------------------------
 	// Helper interfaces
@@ -99,29 +93,11 @@ public interface ScanTableSource extends DynamicTableSource {
 	 * <p>It offers utilities by the planner for creating runtime implementation with minimal dependencies
 	 * to internal data structures.
 	 *
-	 * <p>Methods should be called in {@link #getScanRuntimeProvider(Context)}. The returned instances
+	 * <p>Methods should be called in {@link #getScanRuntimeProvider(ScanContext)}. The returned instances
 	 * are {@link Serializable} and can be directly passed into the runtime implementation class.
 	 */
-	interface Context {
-
-		/**
-		 * Creates type information describing the internal data structures of the given {@link DataType}.
-		 *
-		 * @see TableSchema#toPhysicalRowDataType()
-		 */
-		TypeInformation<?> createTypeInformation(DataType producedDataType);
-
-		/**
-		 * Creates a converter for mapping between objects specified by the given {@link DataType} and
-		 * Flink's internal data structures that can be passed into a runtime implementation.
-		 *
-		 * <p>For example, a {@link Row} and its fields can be converted into {@link RowData}, or a (possibly
-		 * nested) POJO can be converted into the internal representation for structured types.
-		 *
-		 * @see LogicalType#supportsInputConversion(Class)
-		 * @see TableSchema#toPhysicalRowDataType()
-		 */
-		DataStructureConverter createDataStructureConverter(DataType producedDataType);
+	interface ScanContext extends DynamicTableSource.Context {
+		// may introduce scan specific methods in the future
 	}
 
 	/**
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DecodingFormatFactory.java
similarity index 81%
rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DecodingFormatFactory.java
index 184c432..5486aaa 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/ScanFormatFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DecodingFormatFactory.java
@@ -20,12 +20,13 @@ package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.Format;
-import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 
 /**
- * Base interface for configuring a {@link ScanFormat} for a {@link ScanTableSource}.
+ * Base interface for configuring a {@link DecodingFormat} for {@link ScanTableSource} and {@link LookupTableSource}.
  *
  * <p>Depending on the kind of external system, a connector might support different encodings for
  * reading and writing rows. This interface helps in making such formats pluggable.
@@ -38,7 +39,7 @@ import org.apache.flink.table.connector.source.ScanTableSource;
  * @param <I> runtime interface needed by the table source
  */
 @PublicEvolving
-public interface ScanFormatFactory<I> extends Factory {
+public interface DecodingFormatFactory<I> extends Factory {
 
 	/**
 	 * Creates a format from the given context and format options.
@@ -46,5 +47,5 @@ public interface ScanFormatFactory<I> extends Factory {
 	 * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
 	 * to {@code format.ignore-errors}).
 	 */
-	ScanFormat<I> createScanFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+	DecodingFormat<I> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
index 1901fc5..d340035 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationFormatFactory.java
@@ -20,15 +20,15 @@ package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.data.RowData;
 
 /**
- * Factory for creating a {@link ScanFormat} for {@link DeserializationSchema}.
+ * Factory for creating a {@link DecodingFormat} for {@link DeserializationSchema}.
  *
  * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
  */
 @PublicEvolving
-public interface DeserializationFormatFactory extends ScanFormatFactory<DeserializationSchema<RowData>> {
+public interface DeserializationFormatFactory extends DecodingFormatFactory<DeserializationSchema<RowData>> {
   // interface is used for discovery but is already fully specified by the generics
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/EncodingFormatFactory.java
similarity index 82%
rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/EncodingFormatFactory.java
index 4212ea6..578a027 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SinkFormatFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/EncodingFormatFactory.java
@@ -20,12 +20,12 @@ package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.format.Format;
-import org.apache.flink.table.connector.format.SinkFormat;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
 
 /**
- * Base interface for configuring a {@link SinkFormat} for a {@link ScanTableSource}.
+ * Base interface for configuring a {@link EncodingFormat} for a {@link DynamicTableSink}.
  *
  * <p>Depending on the kind of external system, a connector might support different encodings for
  * reading and writing rows. This interface helps in making such formats pluggable.
@@ -38,7 +38,7 @@ import org.apache.flink.table.connector.source.ScanTableSource;
  * @param <I> runtime interface needed by the table sink
  */
 @PublicEvolving
-public interface SinkFormatFactory<I> extends Factory {
+public interface EncodingFormatFactory<I> extends Factory {
 
 	/**
 	 * Creates a format from the given context and format options.
@@ -46,5 +46,5 @@ public interface SinkFormatFactory<I> extends Factory {
 	 * <p>The format options have been projected to top-level options (e.g. from {@code key.format.ignore-errors}
 	 * to {@code format.ignore-errors}).
 	 */
-	SinkFormat<I> createSinkFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
+	EncodingFormat<I> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
index cdbb8b5..e3783c2 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
@@ -29,8 +29,8 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.utils.EncodingUtils;
@@ -405,29 +405,29 @@ public final class FactoryUtil {
 		}
 
 		/**
-		 * Discovers a {@link ScanFormat} of the given type using the given option as factory identifier.
+		 * Discovers a {@link DecodingFormat} of the given type using the given option as factory identifier.
 		 */
-		public <I, F extends ScanFormatFactory<I>> ScanFormat<I> discoverScanFormat(
+		public <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> discoverDecodingFormat(
 				Class<F> formatFactoryClass,
 				ConfigOption<String> formatOption) {
-			return discoverOptionalScanFormat(formatFactoryClass, formatOption)
+			return discoverOptionalDecodingFormat(formatFactoryClass, formatOption)
 				.orElseThrow(() ->
 					new ValidationException(
 						String.format("Could not find required scan format '%s'.", formatOption.key())));
 		}
 
 		/**
-		 * Discovers a {@link ScanFormat} of the given type using the given option (if present) as factory
+		 * Discovers a {@link DecodingFormat} of the given type using the given option (if present) as factory
 		 * identifier.
 		 */
-		public <I, F extends ScanFormatFactory<I>> Optional<ScanFormat<I>> discoverOptionalScanFormat(
+		public <I, F extends DecodingFormatFactory<I>> Optional<DecodingFormat<I>> discoverOptionalDecodingFormat(
 				Class<F> formatFactoryClass,
 				ConfigOption<String> formatOption) {
 			return discoverOptionalFormatFactory(formatFactoryClass, formatOption)
 				.map(formatFactory -> {
 					String formatPrefix = formatPrefix(formatFactory, formatOption);
 					try {
-						return formatFactory.createScanFormat(context, projectOptions(formatPrefix));
+						return formatFactory.createDecodingFormat(context, projectOptions(formatPrefix));
 					} catch (Throwable t) {
 						throw new ValidationException(
 							String.format(
@@ -440,29 +440,29 @@ public final class FactoryUtil {
 		}
 
 		/**
-		 * Discovers a {@link SinkFormat} of the given type using the given option as factory identifier.
+		 * Discovers a {@link EncodingFormat} of the given type using the given option as factory identifier.
 		 */
-		public <I, F extends SinkFormatFactory<I>> SinkFormat<I> discoverSinkFormat(
+		public <I, F extends EncodingFormatFactory<I>> EncodingFormat<I> discoverEncodingFormat(
 				Class<F> formatFactoryClass,
 				ConfigOption<String> formatOption) {
-			return discoverOptionalSinkFormat(formatFactoryClass, formatOption)
+			return discoverOptionalEncodingFormat(formatFactoryClass, formatOption)
 				.orElseThrow(() ->
 					new ValidationException(
 						String.format("Could not find required sink format '%s'.", formatOption.key())));
 		}
 
 		/**
-		 * Discovers a {@link SinkFormat} of the given type using the given option (if present) as factory
+		 * Discovers a {@link EncodingFormat} of the given type using the given option (if present) as factory
 		 * identifier.
 		 */
-		public <I, F extends SinkFormatFactory<I>> Optional<SinkFormat<I>> discoverOptionalSinkFormat(
+		public <I, F extends EncodingFormatFactory<I>> Optional<EncodingFormat<I>> discoverOptionalEncodingFormat(
 				Class<F> formatFactoryClass,
 				ConfigOption<String> formatOption) {
 			return discoverOptionalFormatFactory(formatFactoryClass, formatOption)
 				.map(formatFactory -> {
 					String formatPrefix = formatPrefix(formatFactory, formatOption);
 					try {
-						return formatFactory.createSinkFormat(context, projectOptions(formatPrefix));
+						return formatFactory.createEncodingFormat(context, projectOptions(formatPrefix));
 					} catch (Throwable t) {
 						throw new ValidationException(
 							String.format(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
index 9b669df..1993d6a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/SerializationFormatFactory.java
@@ -20,15 +20,15 @@ package org.apache.flink.table.factories;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.data.RowData;
 
 /**
- * Factory for creating a {@link SinkFormat} for {@link SerializationSchema}.
+ * Factory for creating a {@link EncodingFormat} for {@link SerializationSchema}.
  *
  * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
  */
 @PublicEvolving
-public interface SerializationFormatFactory extends SinkFormatFactory<SerializationSchema<RowData>> {
+public interface SerializationFormatFactory extends EncodingFormatFactory<SerializationSchema<RowData>> {
   // interface is used for discovery but is already fully specified by the generics
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
index ca2c2f7..886f859 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSinkMock;
 import org.apache.flink.table.factories.TestDynamicTableFactory.DynamicTableSourceMock;
-import org.apache.flink.table.factories.TestFormatFactory.ScanFormatMock;
-import org.apache.flink.table.factories.TestFormatFactory.SinkFormatMock;
+import org.apache.flink.table.factories.TestFormatFactory.DecodingFormatMock;
+import org.apache.flink.table.factories.TestFormatFactory.EncodingFormatMock;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -147,15 +147,15 @@ public class FactoryUtilTest {
 		final DynamicTableSource actualSource = createTableSource(options);
 		final DynamicTableSource expectedSource = new DynamicTableSourceMock(
 			"MyTarget",
-			new ScanFormatMock(",", false),
-			new ScanFormatMock("|", true));
+			new DecodingFormatMock(",", false),
+			new DecodingFormatMock("|", true));
 		assertEquals(expectedSource, actualSource);
 		final DynamicTableSink actualSink = createTableSink(options);
 		final DynamicTableSink expectedSink = new DynamicTableSinkMock(
 			"MyTarget",
 			1000L,
-			new SinkFormatMock(","),
-			new SinkFormatMock("|"));
+			new EncodingFormatMock(","),
+			new EncodingFormatMock("|"));
 		assertEquals(expectedSink, actualSink);
 	}
 
@@ -168,14 +168,14 @@ public class FactoryUtilTest {
 		final DynamicTableSource expectedSource = new DynamicTableSourceMock(
 			"MyTarget",
 			null,
-			new ScanFormatMock("|", true));
+			new DecodingFormatMock("|", true));
 		assertEquals(expectedSource, actualSource);
 		final DynamicTableSink actualSink = createTableSink(options);
 		final DynamicTableSink expectedSink = new DynamicTableSinkMock(
 			"MyTarget",
 			1000L,
 			null,
-			new SinkFormatMock("|"));
+			new EncodingFormatMock("|"));
 		assertEquals(expectedSink, actualSink);
 	}
 
@@ -191,15 +191,15 @@ public class FactoryUtilTest {
 		final DynamicTableSource actualSource = createTableSource(options);
 		final DynamicTableSource expectedSource = new DynamicTableSourceMock(
 			"MyTarget",
-			new ScanFormatMock(",", false),
-			new ScanFormatMock(";", true));
+			new DecodingFormatMock(",", false),
+			new DecodingFormatMock(";", true));
 		assertEquals(expectedSource, actualSource);
 		final DynamicTableSink actualSink = createTableSink(options);
 		final DynamicTableSink expectedSink = new DynamicTableSinkMock(
 			"MyTarget",
 			1000L,
-			new SinkFormatMock(","),
-			new SinkFormatMock(";"));
+			new EncodingFormatMock(","),
+			new EncodingFormatMock(";"));
 		assertEquals(expectedSink, actualSink);
 	}
 
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
index 94e1aa1..ad1de6b 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -63,13 +63,13 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
 	public DynamicTableSource createDynamicTableSource(Context context) {
 		final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-		final Optional<ScanFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalScanFormat(
+		final Optional<DecodingFormat<DeserializationSchema<RowData>>> keyFormat = helper.discoverOptionalDecodingFormat(
 			DeserializationFormatFactory.class,
 			KEY_FORMAT);
-		final ScanFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverOptionalScanFormat(
+		final DecodingFormat<DeserializationSchema<RowData>> valueFormat = helper.discoverOptionalDecodingFormat(
 			DeserializationFormatFactory.class,
 			FORMAT).orElseGet(
-				() -> helper.discoverScanFormat(
+				() -> helper.discoverDecodingFormat(
 					DeserializationFormatFactory.class,
 					VALUE_FORMAT));
 		helper.validate();
@@ -84,13 +84,13 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
 	public DynamicTableSink createDynamicTableSink(Context context) {
 		final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-		final Optional<SinkFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalSinkFormat(
+		final Optional<EncodingFormat<SerializationSchema<RowData>>> keyFormat = helper.discoverOptionalEncodingFormat(
 			SerializationFormatFactory.class,
 			KEY_FORMAT);
-		final SinkFormat<SerializationSchema<RowData>> valueFormat = helper.discoverOptionalSinkFormat(
+		final EncodingFormat<SerializationSchema<RowData>> valueFormat = helper.discoverOptionalEncodingFormat(
 			SerializationFormatFactory.class,
 			FORMAT).orElseGet(
-				() -> helper.discoverSinkFormat(
+				() -> helper.discoverEncodingFormat(
 					SerializationFormatFactory.class,
 					VALUE_FORMAT));
 		helper.validate();
@@ -134,16 +134,16 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
 	public static class DynamicTableSourceMock implements ScanTableSource {
 
 		public final String target;
-		public final @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat;
-		public final ScanFormat<DeserializationSchema<RowData>> sourceValueFormat;
+		public final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyFormat;
+		public final DecodingFormat<DeserializationSchema<RowData>> valueFormat;
 
 		DynamicTableSourceMock(
 				String target,
-				@Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat,
-				ScanFormat<DeserializationSchema<RowData>> sourceValueFormat) {
+				@Nullable DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+				DecodingFormat<DeserializationSchema<RowData>> valueFormat) {
 			this.target = target;
-			this.sourceKeyFormat = sourceKeyFormat;
-			this.sourceValueFormat = sourceValueFormat;
+			this.keyFormat = keyFormat;
+			this.valueFormat = valueFormat;
 		}
 
 		@Override
@@ -152,7 +152,7 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
 		}
 
 		@Override
-		public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+		public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 			return null;
 		}
 
@@ -176,13 +176,13 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
 			}
 			DynamicTableSourceMock that = (DynamicTableSourceMock) o;
 			return target.equals(that.target) &&
-				Objects.equals(sourceKeyFormat, that.sourceKeyFormat) &&
-				sourceValueFormat.equals(that.sourceValueFormat);
+				Objects.equals(keyFormat, that.keyFormat) &&
+				valueFormat.equals(that.valueFormat);
 		}
 
 		@Override
 		public int hashCode() {
-			return Objects.hash(target, sourceKeyFormat, sourceValueFormat);
+			return Objects.hash(target, keyFormat, valueFormat);
 		}
 	}
 
@@ -197,18 +197,18 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
 
 		public final String target;
 		public final Long bufferSize;
-		public final @Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat;
-		public final SinkFormat<SerializationSchema<RowData>> sinkValueFormat;
+		public final @Nullable EncodingFormat<SerializationSchema<RowData>> keyFormat;
+		public final EncodingFormat<SerializationSchema<RowData>> valueFormat;
 
 		DynamicTableSinkMock(
 				String target,
 				Long bufferSize,
-				@Nullable SinkFormat<SerializationSchema<RowData>> sinkKeyFormat,
-				SinkFormat<SerializationSchema<RowData>> sinkValueFormat) {
+				@Nullable EncodingFormat<SerializationSchema<RowData>> keyFormat,
+				EncodingFormat<SerializationSchema<RowData>> valueFormat) {
 			this.target = target;
 			this.bufferSize = bufferSize;
-			this.sinkKeyFormat = sinkKeyFormat;
-			this.sinkValueFormat = sinkValueFormat;
+			this.keyFormat = keyFormat;
+			this.valueFormat = valueFormat;
 		}
 
 		@Override
@@ -242,13 +242,13 @@ public final class TestDynamicTableFactory implements DynamicTableSourceFactory,
 			DynamicTableSinkMock that = (DynamicTableSinkMock) o;
 			return target.equals(that.target) &&
 				bufferSize.equals(that.bufferSize) &&
-				Objects.equals(sinkKeyFormat, that.sinkKeyFormat) &&
-				sinkValueFormat.equals(that.sinkValueFormat);
+				Objects.equals(keyFormat, that.keyFormat) &&
+				valueFormat.equals(that.valueFormat);
 		}
 
 		@Override
 		public int hashCode() {
-			return Objects.hash(target, bufferSize, sinkKeyFormat, sinkValueFormat);
+			return Objects.hash(target, bufferSize, keyFormat, valueFormat);
 		}
 	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
index f274b8e..97dcbb1 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestFormatFactory.java
@@ -24,10 +24,10 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.ScanFormat;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
@@ -53,19 +53,19 @@ public class TestFormatFactory implements DeserializationFormatFactory, Serializ
 		.defaultValue(false);
 
 	@Override
-	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+	public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatConfig) {
 		FactoryUtil.validateFactoryOptions(this, formatConfig);
-		return new ScanFormatMock(formatConfig.get(DELIMITER), formatConfig.get(FAIL_ON_MISSING));
+		return new DecodingFormatMock(formatConfig.get(DELIMITER), formatConfig.get(FAIL_ON_MISSING));
 	}
 
 	@Override
-	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+	public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
 			DynamicTableFactory.Context context,
 			ReadableConfig formatConfig) {
 		FactoryUtil.validateFactoryOptions(this, formatConfig);
-		return new SinkFormatMock(formatConfig.get(DELIMITER));
+		return new EncodingFormatMock(formatConfig.get(DELIMITER));
 	}
 
 	@Override
@@ -92,21 +92,21 @@ public class TestFormatFactory implements DeserializationFormatFactory, Serializ
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * {@link ScanFormat} for testing.
+	 * {@link DecodingFormat} for testing.
 	 */
-	public static class ScanFormatMock implements ScanFormat<DeserializationSchema<RowData>> {
+	public static class DecodingFormatMock implements DecodingFormat<DeserializationSchema<RowData>> {
 
 		public final String delimiter;
 		public final Boolean failOnMissing;
 
-		public ScanFormatMock(String delimiter, Boolean failOnMissing) {
+		public DecodingFormatMock(String delimiter, Boolean failOnMissing) {
 			this.delimiter = delimiter;
 			this.failOnMissing = failOnMissing;
 		}
 
 		@Override
-		public DeserializationSchema<RowData> createScanFormat(
-				ScanTableSource.Context context,
+		public DeserializationSchema<RowData> createRuntimeDecoder(
+				DynamicTableSource.Context context,
 				DataType producedDataType) {
 			return null;
 		}
@@ -124,7 +124,7 @@ public class TestFormatFactory implements DeserializationFormatFactory, Serializ
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-			ScanFormatMock that = (ScanFormatMock) o;
+			DecodingFormatMock that = (DecodingFormatMock) o;
 			return delimiter.equals(that.delimiter) && failOnMissing.equals(that.failOnMissing);
 		}
 
@@ -139,18 +139,18 @@ public class TestFormatFactory implements DeserializationFormatFactory, Serializ
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * {@link SinkFormat} for testing.
+	 * {@link EncodingFormat} for testing.
 	 */
-	public static class SinkFormatMock implements SinkFormat<SerializationSchema<RowData>> {
+	public static class EncodingFormatMock implements EncodingFormat<SerializationSchema<RowData>> {
 
 		public final String delimiter;
 
-		public SinkFormatMock(String delimiter) {
+		public EncodingFormatMock(String delimiter) {
 			this.delimiter = delimiter;
 		}
 
 		@Override
-		public SerializationSchema<RowData> createSinkFormat(
+		public SerializationSchema<RowData> createRuntimeEncoder(
 				DynamicTableSink.Context context,
 				DataType consumeDataType) {
 			return null;
@@ -169,7 +169,7 @@ public class TestFormatFactory implements DeserializationFormatFactory, Serializ
 			if (o == null || getClass() != o.getClass()) {
 				return false;
 			}
-			SinkFormatMock that = (SinkFormatMock) o;
+			EncodingFormatMock that = (EncodingFormatMock) o;
 			return delimiter.equals(that.delimiter);
 		}
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index c913b10..920cba6 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -373,7 +373,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 
 		@SuppressWarnings("unchecked")
 		@Override
-		public ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.Context runtimeProviderContext) {
+		public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 			TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
 				.createTypeInformation(physicalSchema.toRowDataType())
 				.createSerializer(new ExecutionConfig());
@@ -398,7 +398,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 
 		@SuppressWarnings({"unchecked", "rawtypes"})
 		@Override
-		public LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.Context context) {
+		public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
 			if (lookupFunctionClass != null) {
 				// use the specified lookup function
 				try {
@@ -500,7 +500,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 	public static class MockedLookupTableSource implements LookupTableSource {
 
 		@Override
-		public LookupRuntimeProvider getLookupRuntimeProvider(Context context) {
+		public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
 			return null;
 		}
 
@@ -526,7 +526,7 @@ public final class TestValuesTableFactory implements DynamicTableSourceFactory,
 		}
 
 		@Override
-		public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
+		public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
 			return null;
 		}
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java
index 4fd5029..d616a2c 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/LookupRuntimeProviderContext.java
@@ -19,16 +19,21 @@
 package org.apache.flink.table.runtime.connector.source;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeTransformations;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo;
 
 /**
  * Implementation of {@link LookupTableSource.Context}.
  */
 @Internal
-public final class LookupRuntimeProviderContext implements LookupTableSource.Context {
+public final class LookupRuntimeProviderContext implements LookupTableSource.LookupContext {
 
 	private final int[][] lookupKeys;
 
@@ -42,6 +47,14 @@ public final class LookupRuntimeProviderContext implements LookupTableSource.Con
 	}
 
 	@Override
+	public TypeInformation<?> createTypeInformation(DataType producedDataType) {
+		final DataType internalDataType = DataTypeUtils.transform(
+			producedDataType,
+			TypeTransformations.TO_INTERNAL_CLASS);
+		return fromDataTypeToTypeInfo(internalDataType);
+	}
+
+	@Override
 	public DataStructureConverter createDataStructureConverter(DataType producedDataType) {
 		return new DataStructureConverterWrapper(DataStructureConverters.getConverter(producedDataType));
 	}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java
index 8de2eb5..ad0131e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/connector/source/ScanRuntimeProviderContext.java
@@ -33,7 +33,7 @@ import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fro
  * Implementation of {@link ScanTableSource.Context}.
  */
 @Internal
-public final class ScanRuntimeProviderContext implements ScanTableSource.Context {
+public final class ScanRuntimeProviderContext implements ScanTableSource.ScanContext {
 
 	public static final ScanRuntimeProviderContext INSTANCE = new ScanRuntimeProviderContext();