You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/18 10:12:18 UTC

[flink-table-store] branch master updated: [FLINK-26103] Introduce log store and implement Kafka log store

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 54ca3dc  [FLINK-26103] Introduce log store and implement Kafka log store
54ca3dc is described below

commit 54ca3dc2f6ea3b40beb24ada3539f6db0bce989f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Feb 18 18:12:13 2022 +0800

    [FLINK-26103] Introduce log store and implement Kafka log store
    
    This closes #20
---
 .../store/connector/sink/StoreSinkWriter.java      |   2 +-
 .../flink/table/store/log/LogInitContext.java      |  87 +++++++++
 .../apache/flink/table/store/log/LogOptions.java   | 192 ++++++++++++++++++
 .../flink/table/store/log/LogSinkProvider.java     |  52 +++++
 .../flink/table/store/log/LogSourceProvider.java   |  43 ++++
 .../table/store/log/LogStoreTableFactory.java      | 132 +++++++++++++
 .../apache/flink/table/store/sink/SinkRecord.java  |  13 +-
 .../table/store/sink/SinkRecordConverter.java      |  27 ++-
 .../flink/table/store/utils/OptionsUtils.java      |  32 +++
 flink-table-store-kafka/pom.xml                    | 160 +++++++++++++++
 .../kafka/KafkaLogKeyedDeserializationSchema.java  | 106 ++++++++++
 .../flink/table/store/kafka/KafkaLogOptions.java   |  32 +++
 .../store/kafka/KafkaLogSerializationSchema.java   |  86 ++++++++
 .../table/store/kafka/KafkaLogSinkProvider.java    | 106 ++++++++++
 .../table/store/kafka/KafkaLogSourceProvider.java  | 144 ++++++++++++++
 .../table/store/kafka/KafkaLogStoreFactory.java    | 208 ++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../flink/table/store/kafka/KafkaLogITCase.java    | 217 +++++++++++++++++++++
 .../store/kafka/KafkaLogSerializationTest.java     | 146 ++++++++++++++
 .../flink/table/store/kafka/KafkaLogTestUtils.java | 190 ++++++++++++++++++
 .../table/store/kafka/KafkaTableTestBase.java      | 194 ++++++++++++++++++
 .../table/store/kafka/TestOffsetsLogSink.java      | 105 ++++++++++
 .../src/test/resources/log4j2-test.properties      |  38 ++++
 pom.xml                                            |   3 +
 24 files changed, 2312 insertions(+), 19 deletions(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 8cc88a1..dc8c993 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -97,7 +97,7 @@ public class StoreSinkWriter<WriterStateT>
     }
 
     private void writeToFileStore(RecordWriter writer, SinkRecord record) throws Exception {
-        switch (record.rowKind()) {
+        switch (record.row().getRowKind()) {
             case INSERT:
             case UPDATE_AFTER:
                 if (record.key().getArity() == 0) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
new file mode 100644
index 0000000..a5983a9
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.function.Consumer;
+
+/** A {@link InitContext} with {@link InitContext#metadataConsumer()}. */
+public class LogInitContext implements InitContext {
+
+    private final InitContext context;
+    private final Consumer<?> metaConsumer;
+
+    public LogInitContext(InitContext context, Consumer<?> metaConsumer) {
+        this.context = context;
+        this.metaConsumer = metaConsumer;
+    }
+
+    @Override
+    public UserCodeClassLoader getUserCodeClassLoader() {
+        return context.getUserCodeClassLoader();
+    }
+
+    @Override
+    public MailboxExecutor getMailboxExecutor() {
+        return context.getMailboxExecutor();
+    }
+
+    @Override
+    public ProcessingTimeService getProcessingTimeService() {
+        return context.getProcessingTimeService();
+    }
+
+    @Override
+    public int getSubtaskId() {
+        return context.getSubtaskId();
+    }
+
+    @Override
+    public int getNumberOfParallelSubtasks() {
+        return context.getNumberOfParallelSubtasks();
+    }
+
+    @Override
+    public SinkWriterMetricGroup metricGroup() {
+        return context.metricGroup();
+    }
+
+    @Override
+    public OptionalLong getRestoredCheckpointId() {
+        return context.getRestoredCheckpointId();
+    }
+
+    @Override
+    public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
+        return context.asSerializationSchemaInitializationContext();
+    }
+
+    @Override
+    public Optional<Consumer<?>> metadataConsumer() {
+        return Optional.of(metaConsumer);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
new file mode 100644
index 0000000..3f36945
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
+
+/** Options for log store. */
+public class LogOptions {
+
+    public static final ConfigOption<LogStartupMode> SCAN =
+            ConfigOptions.key("scan")
+                    .enumType(LogStartupMode.class)
+                    .defaultValue(LogStartupMode.FULL)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the startup mode for log consumer.")
+                                    .linebreak()
+                                    .list(formatEnumOption(LogStartupMode.FULL))
+                                    .list(formatEnumOption(LogStartupMode.LATEST))
+                                    .list(formatEnumOption(LogStartupMode.FROM_TIMESTAMP))
+                                    .build());
+
+    public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLS =
+            ConfigOptions.key("scan.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"from-timestamp\" scan mode");
+
+    public static final ConfigOption<Duration> RETENTION =
+            ConfigOptions.key("retention")
+                    .durationType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "It means how long changes log will be kept. The default value is from the log system cluster.");
+
+    public static final ConfigOption<LogConsistency> CONSISTENCY =
+            ConfigOptions.key("consistency")
+                    .enumType(LogConsistency.class)
+                    .defaultValue(LogConsistency.TRANSACTIONAL)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the log consistency mode for table.")
+                                    .linebreak()
+                                    .list(
+                                            formatEnumOption(LogConsistency.TRANSACTIONAL),
+                                            formatEnumOption(LogConsistency.EVENTUAL))
+                                    .build());
+
+    public static final ConfigOption<LogChangelogMode> CHANGELOG_MODE =
+            ConfigOptions.key("changelog-mode")
+                    .enumType(LogChangelogMode.class)
+                    .defaultValue(LogChangelogMode.AUTO)
+                    .withDescription(
+                            Description.builder()
+                                    .text("Specify the log changelog mode for table.")
+                                    .linebreak()
+                                    .list(
+                                            formatEnumOption(LogChangelogMode.AUTO),
+                                            formatEnumOption(LogChangelogMode.ALL),
+                                            formatEnumOption(LogChangelogMode.UPSERT))
+                                    .build());
+
+    public static final ConfigOption<String> KEY_FORMAT =
+            ConfigOptions.key("key.format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription(
+                            "Specify the key message format of log system with primary key.");
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .defaultValue("debezium-json")
+                    .withDescription("Specify the message format of log system.");
+
+    /** Specifies the startup mode for log consumer. */
+    public enum LogStartupMode implements DescribedEnum {
+        FULL(
+                "full",
+                "Perform a snapshot on the table upon first startup,"
+                        + " and continue to read the latest changes."),
+
+        LATEST("latest", "Start from the latest."),
+
+        FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
+
+        private final String value;
+        private final String description;
+
+        LogStartupMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+
+    /** Specifies the log consistency mode for table. */
+    public enum LogConsistency implements DescribedEnum {
+        TRANSACTIONAL(
+                "transactional",
+                "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
+
+        EVENTUAL(
+                "eventual",
+                "Immediate data visibility, you may see some intermediate states, "
+                        + "but eventually the right results will be produced, only works for table with primary key.");
+
+        private final String value;
+        private final String description;
+
+        LogConsistency(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+
+    /** Specifies the log changelog mode for table. */
+    public enum LogChangelogMode implements DescribedEnum {
+        AUTO("auto", "Upsert for table with primary key, all for table without primary key.."),
+
+        ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
+
+        UPSERT(
+                "upsert",
+                "The log system does not store the UPDATE_BEFORE changes, the log consumed job"
+                        + " will automatically add the normalized node, relying on the state"
+                        + " to generate the required update_before.");
+
+        private final String value;
+        private final String description;
+
+        LogChangelogMode(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
new file mode 100644
index 0000000..8a091fa
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.table.store.sink.SinkRecord;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+
+/** A {@link Serializable} sink provider for log store. */
+public interface LogSinkProvider extends Serializable {
+
+    /** Creates a {@link Sink} instance. */
+    Sink<SinkRecord> createSink();
+
+    /**
+     * Create a metadata consumer for {@link Sink.InitContext#metadataConsumer()} from {@link
+     * WriteCallback}.
+     */
+    Consumer<?> createMetadataConsumer(WriteCallback callback);
+
+    /**
+     * A callback interface that the user can implement to know the offset of the bucket when the
+     * request is complete.
+     */
+    interface WriteCallback {
+
+        /**
+         * A callback method the user can implement to provide asynchronous handling of request
+         * completion. This method will be called when the record sent to the server has been
+         * acknowledged.
+         */
+        void onCompletion(int bucket, long offset);
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSourceProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSourceProvider.java
new file mode 100644
index 0000000..94dd604
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSourceProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A {@link Serializable} source provider for log store.
+ *
+ * <p>This class is serializable, it can be wrapped as the {@link HybridSource.SourceFactory}.
+ */
+public interface LogSourceProvider extends Serializable {
+
+    /**
+     * Creates a {@link Source} instance.
+     *
+     * @param bucketOffsets optional, configure if you need to specify the startup offset.
+     */
+    Source<RowData, ?, ?> createSource(@Nullable Map<Integer, Long> bucketOffsets);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
new file mode 100644
index 0000000..35c435d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Base interface for configuring a default log table connector. The log table is used by managed
+ * table factory.
+ *
+ * <p>Log tables are for processing only unbounded data. Support streaming reading and streaming
+ * writing.
+ */
+public interface LogStoreTableFactory extends DynamicTableFactory {
+
+    /** Notifies the listener that a table creation occurred. */
+    void onCreateTable(Context context, int numBucket);
+
+    /** Notifies the listener that a table drop occurred. */
+    void onDropTable(Context context);
+
+    /**
+     * Creates a {@link LogSourceProvider} instance from a {@link CatalogTable} and additional
+     * context information.
+     */
+    LogSourceProvider createSourceProvider(Context context, SourceContext sourceContext);
+
+    /**
+     * Creates a {@link LogSinkProvider} instance from a {@link CatalogTable} and additional context
+     * information.
+     */
+    LogSinkProvider createSinkProvider(Context context, SinkContext sinkContext);
+
+    /** Context for create runtime source. */
+    interface SourceContext extends DynamicTableSource.Context {}
+
+    /** Context for create runtime sink. */
+    interface SinkContext extends DynamicTableSink.Context {}
+
+    // --------------------------------------------------------------------------------------------
+
+    static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String identifier) {
+        return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class, identifier);
+    }
+
+    static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
+            TableFactoryHelper helper) {
+        DecodingFormat<DeserializationSchema<RowData>> format =
+                helper.discoverDecodingFormat(
+                        DeserializationFormatFactory.class, LogOptions.KEY_FORMAT);
+        validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+        return format;
+    }
+
+    static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(
+            TableFactoryHelper helper) {
+        EncodingFormat<SerializationSchema<RowData>> format =
+                helper.discoverEncodingFormat(
+                        SerializationFormatFactory.class, LogOptions.KEY_FORMAT);
+        validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+        return format;
+    }
+
+    static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
+            TableFactoryHelper helper) {
+        DecodingFormat<DeserializationSchema<RowData>> format =
+                helper.discoverDecodingFormat(
+                        DeserializationFormatFactory.class, LogOptions.FORMAT);
+        validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+        return format;
+    }
+
+    static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
+            TableFactoryHelper helper) {
+        EncodingFormat<SerializationSchema<RowData>> format =
+                helper.discoverEncodingFormat(SerializationFormatFactory.class, LogOptions.FORMAT);
+        validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+        return format;
+    }
+
+    static void validateKeyFormat(Format format, String name) {
+        if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            throw new ValidationException(
+                    String.format(
+                            "A key format should only deal with INSERT-only records. "
+                                    + "But %s has a changelog mode of %s.",
+                            name, format.getChangelogMode()));
+        }
+    }
+
+    static void validateValueFormat(Format format, String name) {
+        if (!format.getChangelogMode().equals(ChangelogMode.all())) {
+            throw new ValidationException(
+                    String.format(
+                            "A value format should deal with all records. "
+                                    + "But %s has a changelog mode of %s.",
+                            name, format.getChangelogMode()));
+        }
+    }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
index 787e457..25b08c8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
@@ -24,27 +24,22 @@ import org.apache.flink.types.RowKind;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/** A sink records contains key, value and partition, bucket, row kind information. */
+/** A sink record contains key, row and partition, bucket information. */
 public class SinkRecord {
 
     private final BinaryRowData partition;
 
     private final int bucket;
 
-    private final RowKind rowKind;
-
     private final BinaryRowData key;
 
     private final RowData row;
 
-    public SinkRecord(
-            BinaryRowData partition, int bucket, RowKind rowKind, BinaryRowData key, RowData row) {
+    public SinkRecord(BinaryRowData partition, int bucket, BinaryRowData key, RowData row) {
         checkArgument(partition.getRowKind() == RowKind.INSERT);
         checkArgument(key.getRowKind() == RowKind.INSERT);
-        checkArgument(row.getRowKind() == RowKind.INSERT);
         this.partition = partition;
         this.bucket = bucket;
-        this.rowKind = rowKind;
         this.key = key;
         this.row = row;
     }
@@ -64,8 +59,4 @@ public class SinkRecord {
     public RowData row() {
         return row;
     }
-
-    public RowKind rowKind() {
-        return rowKind;
-    }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 2f17fb9..55c1bf8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -21,17 +21,18 @@ package org.apache.flink.table.store.sink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.Projection;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.utils.ProjectionUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 
+import java.util.stream.IntStream;
+
 /** Converter for converting {@link RowData} to {@link SinkRecord}. */
 public class SinkRecordConverter {
 
     private final int numBucket;
 
-    private final RowDataSerializer rowSerializer;
+    private final Projection<RowData, BinaryRowData> allProjection;
 
     private final Projection<RowData, BinaryRowData> partProjection;
 
@@ -39,18 +40,30 @@ public class SinkRecordConverter {
 
     public SinkRecordConverter(int numBucket, RowType inputType, int[] partitions, int[] keys) {
         this.numBucket = numBucket;
-        this.rowSerializer = new RowDataSerializer(inputType);
+        this.allProjection =
+                ProjectionUtils.newProjection(
+                        inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
         this.partProjection = ProjectionUtils.newProjection(inputType, partitions);
         this.keyProjection = ProjectionUtils.newProjection(inputType, keys);
     }
 
     public SinkRecord convert(RowData row) {
-        RowKind rowKind = row.getRowKind();
-        row.setRowKind(RowKind.INSERT);
         BinaryRowData partition = partProjection.apply(row);
         BinaryRowData key = keyProjection.apply(row);
-        int hash = key.getArity() == 0 ? rowSerializer.toBinaryRow(row).hashCode() : key.hashCode();
+        int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
         int bucket = Math.abs(hash % numBucket);
-        return new SinkRecord(partition, bucket, rowKind, key, row);
+        return new SinkRecord(partition, bucket, key, row);
+    }
+
+    private int hashRow(RowData row) {
+        if (row instanceof BinaryRowData) {
+            RowKind rowKind = row.getRowKind();
+            row.setRowKind(RowKind.INSERT);
+            int hash = row.hashCode();
+            row.setRowKind(rowKind);
+            return hash;
+        } else {
+            return allProjection.apply(row).hashCode();
+        }
     }
 }
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/utils/OptionsUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/utils/OptionsUtils.java
new file mode 100644
index 0000000..3436e3c
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/utils/OptionsUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.TextElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Utils for options. */
+public class OptionsUtils {
+
+    public static TextElement formatEnumOption(DescribedEnum describedEnum) {
+        return text("\"%s\": %s", text(describedEnum.toString()), describedEnum.getDescription());
+    }
+}
diff --git a/flink-table-store-kafka/pom.xml b/flink-table-store-kafka/pom.xml
new file mode 100644
index 0000000..75e6811
--- /dev/null
+++ b/flink-table-store-kafka/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-table-store-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-store-kafka</artifactId>
+    <name>Flink Table Store : Kafka</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- flink dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-core</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <!-- include 2.0 server for tests  -->
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
+            <version>2.4.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.dataformat</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.module</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.datatype</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit4.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
new file mode 100644
index 0000000..50485c6
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.stream.IntStream;
+
+/** A {@link KafkaDeserializationSchema} for the table with primary key in log store. */
+public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final TypeInformation<RowData> producedType;
+    private final int fieldCount;
+    private final int[] primaryKey;
+    private final DeserializationSchema<RowData> keyDeserializer;
+    private final DeserializationSchema<RowData> valueDeserializer;
+    private final RowData.FieldGetter[] keyFieldGetters;
+
+    public KafkaLogKeyedDeserializationSchema(
+            DataType physicalType,
+            int[] primaryKey,
+            DeserializationSchema<RowData> keyDeserializer,
+            DeserializationSchema<RowData> valueDeserializer) {
+        this.primaryKey = primaryKey;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.producedType = InternalTypeInfo.of(physicalType.getLogicalType());
+        this.fieldCount = physicalType.getChildren().size();
+        this.keyFieldGetters =
+                IntStream.range(0, primaryKey.length)
+                        .mapToObj(
+                                i ->
+                                        RowData.createFieldGetter(
+                                                physicalType
+                                                        .getChildren()
+                                                        .get(primaryKey[i])
+                                                        .getLogicalType(),
+                                                i))
+                        .toArray(RowData.FieldGetter[]::new);
+    }
+
+    @Override
+    public void open(DeserializationSchema.InitializationContext context) throws Exception {
+        keyDeserializer.open(context);
+        valueDeserializer.open(context);
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData nextElement) {
+        return false;
+    }
+
+    @Override
+    public RowData deserialize(ConsumerRecord<byte[], byte[]> record) {
+        throw new RuntimeException(
+                "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+    }
+
+    @Override
+    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> out)
+            throws Exception {
+        if (record.value() == null) {
+            RowData key = keyDeserializer.deserialize(record.key());
+            GenericRowData value = new GenericRowData(RowKind.DELETE, fieldCount);
+            for (int i = 0; i < primaryKey.length; i++) {
+                value.setField(primaryKey[i], keyFieldGetters[i].getFieldOrNull(key));
+            }
+            out.collect(value);
+        } else {
+            valueDeserializer.deserialize(record.value(), out);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedType;
+    }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
new file mode 100644
index 0000000..8882488
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for kafka log. */
+public class KafkaLogOptions {
+
+    public static final ConfigOption<String> BOOTSTRAP_SERVERS =
+            ConfigOptions.key("kafka.bootstrap.servers")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Required Kafka server connection string");
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
new file mode 100644
index 0000000..e9b4081
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.types.RowKind;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/** A {@link KafkaRecordSerializationSchema} for the table in log store. */
+public class KafkaLogSerializationSchema implements KafkaRecordSerializationSchema<SinkRecord> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String topic;
+    @Nullable private final SerializationSchema<RowData> keySerializer;
+    private final SerializationSchema<RowData> valueSerializer;
+    private final LogChangelogMode changelogMode;
+
+    public KafkaLogSerializationSchema(
+            String topic,
+            @Nullable SerializationSchema<RowData> keySerializer,
+            SerializationSchema<RowData> valueSerializer,
+            LogChangelogMode changelogMode) {
+        this.topic = topic;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.changelogMode = changelogMode;
+        if (changelogMode == LogChangelogMode.UPSERT && keySerializer == null) {
+            throw new IllegalArgumentException(
+                    "Can not use upsert changelog mode for non-pk table.");
+        }
+    }
+
+    @Override
+    public void open(
+            SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
+            throws Exception {
+        if (keySerializer != null) {
+            keySerializer.open(context);
+        }
+        valueSerializer.open(context);
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(
+            SinkRecord element, KafkaSinkContext context, Long timestamp) {
+        RowKind kind = element.row().getRowKind();
+
+        byte[] keyBytes = null;
+        byte[] valueBytes = null;
+        if (keySerializer != null) {
+            keyBytes = keySerializer.serialize(element.key());
+            if (changelogMode == LogChangelogMode.ALL
+                    || kind == RowKind.INSERT
+                    || kind == RowKind.UPDATE_AFTER) {
+                valueBytes = valueSerializer.serialize(element.row());
+            }
+        } else {
+            valueBytes = valueSerializer.serialize(element.row());
+        }
+        return new ProducerRecord<>(topic, element.bucket(), keyBytes, valueBytes);
+    }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
new file mode 100644
index 0000000..85b9cc0
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.sink.SinkRecord;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import javax.annotation.Nullable;
+
+import java.util.Properties;
+import java.util.function.Consumer;
+
+/** A Kafka {@link LogSinkProvider}. */
+public class KafkaLogSinkProvider implements LogSinkProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String topic;
+
+    private final Properties properties;
+
+    @Nullable private final SerializationSchema<RowData> keySerializer;
+
+    private final SerializationSchema<RowData> valueSerializer;
+
+    private final LogConsistency consistency;
+
+    private final LogChangelogMode changelogMode;
+
+    public KafkaLogSinkProvider(
+            String topic,
+            Properties properties,
+            @Nullable SerializationSchema<RowData> keySerializer,
+            SerializationSchema<RowData> valueSerializer,
+            LogConsistency consistency,
+            LogChangelogMode changelogMode) {
+        this.topic = topic;
+        this.properties = properties;
+        this.keySerializer = keySerializer;
+        this.valueSerializer = valueSerializer;
+        this.consistency = consistency;
+        this.changelogMode = changelogMode;
+    }
+
+    @Override
+    public KafkaSink<SinkRecord> createSink() {
+        KafkaSinkBuilder<SinkRecord> builder = KafkaSink.builder();
+        switch (consistency) {
+            case TRANSACTIONAL:
+                builder.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+                        .setTransactionalIdPrefix("log-store-" + topic);
+                break;
+            case EVENTUAL:
+                if (keySerializer == null) {
+                    throw new IllegalArgumentException(
+                            "Can not use EVENTUAL consistency mode for non-pk table.");
+                }
+                builder.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE);
+                break;
+        }
+
+        return builder.setBootstrapServers(
+                        properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
+                .setKafkaProducerConfig(properties)
+                .setRecordSerializer(createSerializationSchema())
+                .build();
+    }
+
+    @Override
+    public Consumer<RecordMetadata> createMetadataConsumer(WriteCallback callback) {
+        return meta -> callback.onCompletion(meta.partition(), meta.offset());
+    }
+
+    @VisibleForTesting
+    KafkaLogSerializationSchema createSerializationSchema() {
+        return new KafkaLogSerializationSchema(
+                topic, keySerializer, valueSerializer, changelogMode);
+    }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
new file mode 100644
index 0000000..68dbe63
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+
+/** A Kafka {@link LogSourceProvider}. */
+public class KafkaLogSourceProvider implements LogSourceProvider {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String topic;
+
+    private final Properties properties;
+
+    private final DataType physicalType;
+
+    private final int[] primaryKey;
+
+    @Nullable private final DeserializationSchema<RowData> keyDeserializer;
+
+    private final DeserializationSchema<RowData> valueDeserializer;
+
+    private final LogConsistency consistency;
+
+    private final LogStartupMode scanMode;
+
+    @Nullable private final Long timestampMills;
+
+    public KafkaLogSourceProvider(
+            String topic,
+            Properties properties,
+            DataType physicalType,
+            int[] primaryKey,
+            @Nullable DeserializationSchema<RowData> keyDeserializer,
+            DeserializationSchema<RowData> valueDeserializer,
+            LogConsistency consistency,
+            LogStartupMode scanMode,
+            @Nullable Long timestampMills) {
+        this.topic = topic;
+        this.properties = properties;
+        this.physicalType = physicalType;
+        this.primaryKey = primaryKey;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.consistency = consistency;
+        this.scanMode = scanMode;
+        this.timestampMills = timestampMills;
+    }
+
+    @Override
+    public KafkaSource<RowData> createSource(@Nullable Map<Integer, Long> bucketOffsets) {
+        switch (consistency) {
+            case TRANSACTIONAL:
+                // Add read committed for transactional consistency mode.
+                properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed");
+                break;
+            case EVENTUAL:
+                if (keyDeserializer == null) {
+                    throw new IllegalArgumentException(
+                            "Can not use EVENTUAL consistency mode for non-pk table.");
+                }
+                properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+                break;
+        }
+
+        return KafkaSource.<RowData>builder()
+                .setTopics(topic)
+                .setStartingOffsets(toOffsetsInitializer(bucketOffsets))
+                .setProperties(properties)
+                .setDeserializer(createDeserializationSchema())
+                .build();
+    }
+
+    @VisibleForTesting
+    KafkaRecordDeserializationSchema<RowData> createDeserializationSchema() {
+        return primaryKey.length > 0
+                ? KafkaRecordDeserializationSchema.of(
+                        new KafkaLogKeyedDeserializationSchema(
+                                physicalType, primaryKey, keyDeserializer, valueDeserializer))
+                : KafkaRecordDeserializationSchema.valueOnly(valueDeserializer);
+    }
+
+    private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer, Long> bucketOffsets) {
+        switch (scanMode) {
+            case FULL:
+                return bucketOffsets == null
+                        ? OffsetsInitializer.earliest()
+                        : OffsetsInitializer.offsets(toKafkaOffsets(bucketOffsets));
+            case LATEST:
+                return OffsetsInitializer.latest();
+            case FROM_TIMESTAMP:
+                if (timestampMills == null) {
+                    throw new NullPointerException(
+                            "Must specify a timestamp if you choose timestamp startup mode.");
+                }
+                return OffsetsInitializer.timestamp(timestampMills);
+            default:
+                throw new UnsupportedOperationException("Unsupported mode: " + scanMode);
+        }
+    }
+
+    private Map<TopicPartition, Long> toKafkaOffsets(Map<Integer, Long> bucketOffsets) {
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        bucketOffsets.forEach(
+                (bucket, offset) -> offsets.put(new TopicPartition(topic, bucket), offset));
+        return offsets;
+    }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
new file mode 100644
index 0000000..9208940
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.FORMAT;
+import static org.apache.flink.table.store.log.LogOptions.KEY_FORMAT;
+import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import static org.apache.flink.table.store.log.LogOptions.RETENTION;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
+import static org.apache.flink.table.store.log.LogOptions.SCAN_TIMESTAMP_MILLS;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+
+/** The Kafka {@link LogStoreTableFactory} implementation. */
+public class KafkaLogStoreFactory implements LogStoreTableFactory {
+
+    public static final String IDENTIFIER = "kafka";
+
+    public static final String KAFKA_PREFIX = IDENTIFIER + ".";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(BOOTSTRAP_SERVERS);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(SCAN);
+        options.add(SCAN_TIMESTAMP_MILLS);
+        options.add(RETENTION);
+        options.add(CONSISTENCY);
+        options.add(CHANGELOG_MODE);
+        options.add(KEY_FORMAT);
+        options.add(FORMAT);
+        return options;
+    }
+
+    @Override
+    public void onCreateTable(DynamicTableFactory.Context context, int numBucket) {
+        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+        helper.validateExcept(KAFKA_PREFIX);
+        try (AdminClient adminClient = AdminClient.create(toKafkaProperties(helper.getOptions()))) {
+            Map<String, String> configs = new HashMap<>();
+            helper.getOptions()
+                    .getOptional(RETENTION)
+                    .ifPresent(
+                            retention ->
+                                    configs.put(
+                                            TopicConfig.RETENTION_MS_CONFIG,
+                                            String.valueOf(retention.toMillis())));
+
+            NewTopic topicObj =
+                    new NewTopic(topic(context), Optional.of(numBucket), Optional.empty())
+                            .configs(configs);
+            adminClient.createTopics(Collections.singleton(topicObj)).all().get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new TableException("Error in createTopic", e);
+        }
+    }
+
+    @Override
+    public void onDropTable(DynamicTableFactory.Context context) {
+        try (AdminClient adminClient =
+                AdminClient.create(
+                        toKafkaProperties(createTableFactoryHelper(this, context).getOptions()))) {
+            adminClient.deleteTopics(Collections.singleton(topic(context))).all().get();
+        } catch (ExecutionException e) {
+            // ignore topic not exists
+            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+                throw new TableException("Error in deleteTopic", e);
+            }
+        } catch (InterruptedException e) {
+            throw new TableException("Error in deleteTopic", e);
+        }
+    }
+
+    @Override
+    public KafkaLogSourceProvider createSourceProvider(
+            DynamicTableFactory.Context context, SourceContext sourceContext) {
+        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+        DataType physicalType = schema.toPhysicalRowDataType();
+        DeserializationSchema<RowData> keyDeserializer = null;
+        int[] primaryKey = schema.getPrimaryKeyIndexes();
+        if (primaryKey.length > 0) {
+            DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
+            keyDeserializer =
+                    LogStoreTableFactory.getKeyDecodingFormat(helper)
+                            .createRuntimeDecoder(sourceContext, keyType);
+        }
+        DeserializationSchema<RowData> valueDeserializer =
+                LogStoreTableFactory.getValueDecodingFormat(helper)
+                        .createRuntimeDecoder(sourceContext, physicalType);
+        return new KafkaLogSourceProvider(
+                topic(context),
+                toKafkaProperties(helper.getOptions()),
+                physicalType,
+                primaryKey,
+                keyDeserializer,
+                valueDeserializer,
+                helper.getOptions().get(CONSISTENCY),
+                helper.getOptions().get(SCAN),
+                helper.getOptions().get(SCAN_TIMESTAMP_MILLS));
+    }
+
+    @Override
+    public KafkaLogSinkProvider createSinkProvider(
+            DynamicTableFactory.Context context, SinkContext sinkContext) {
+        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+        DataType physicalType = schema.toPhysicalRowDataType();
+        SerializationSchema<RowData> keySerializer = null;
+        int[] primaryKey = schema.getPrimaryKeyIndexes();
+        if (primaryKey.length > 0) {
+            DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
+            keySerializer =
+                    LogStoreTableFactory.getKeyEncodingFormat(helper)
+                            .createRuntimeEncoder(sinkContext, keyType);
+        }
+        SerializationSchema<RowData> valueSerializer =
+                LogStoreTableFactory.getValueEncodingFormat(helper)
+                        .createRuntimeEncoder(sinkContext, physicalType);
+        return new KafkaLogSinkProvider(
+                topic(context),
+                toKafkaProperties(helper.getOptions()),
+                keySerializer,
+                valueSerializer,
+                helper.getOptions().get(CONSISTENCY),
+                helper.getOptions().get(CHANGELOG_MODE));
+    }
+
+    private static String topic(DynamicTableFactory.Context context) {
+        return context.getObjectIdentifier().asSummaryString();
+    }
+
+    public static Properties toKafkaProperties(ReadableConfig options) {
+        Properties properties = new Properties();
+        Map<String, String> optionMap = ((Configuration) options).toMap();
+        optionMap.keySet().stream()
+                .filter(key -> key.startsWith(KAFKA_PREFIX))
+                .forEach(
+                        key ->
+                                properties.put(
+                                        key.substring((KAFKA_PREFIX).length()),
+                                        optionMap.get(key)));
+
+        // Add read committed for transactional consistency mode.
+        if (options.get(CONSISTENCY) == LogConsistency.TRANSACTIONAL) {
+            properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed");
+        }
+        return properties;
+    }
+}
diff --git a/flink-table-store-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..df0c6b8
--- /dev/null
+++ b/flink-table-store-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.kafka.KafkaLogStoreFactory
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
new file mode 100644
index 0000000..8b9dccf
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testContext;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link KafkaLogStoreFactory}. */
+public class KafkaLogITCase extends KafkaTableTestBase {
+
+    private final KafkaLogStoreFactory factory = discoverKafkaLogFactory();
+
+    @Test
+    public void testDropEmpty() {
+        // Expect no exceptions to be thrown
+        factory.onDropTable(testContext(getBootstrapServers(), LogChangelogMode.AUTO, true));
+    }
+
+    @Test
+    public void testUpsertTransactionKeyed() throws Exception {
+        innerTest(
+                "UpsertTransactionKeyed",
+                LogChangelogMode.UPSERT,
+                LogConsistency.TRANSACTIONAL,
+                true);
+    }
+
+    @Test
+    public void testAllTransactionKeyed() throws Exception {
+        innerTest("AllTransactionKeyed", LogChangelogMode.ALL, LogConsistency.TRANSACTIONAL, true);
+    }
+
+    @Test
+    public void testUpsertEventualKeyed() throws Exception {
+        innerTest("UpsertEventualKeyed", LogChangelogMode.UPSERT, LogConsistency.EVENTUAL, true);
+    }
+
+    @Test
+    public void testAllEventualKeyed() throws Exception {
+        innerTest("AllEventualKeyed", LogChangelogMode.ALL, LogConsistency.EVENTUAL, true);
+    }
+
+    @Test
+    public void testAllTransactionNonKeyed() throws Exception {
+        innerTest(
+                "AllTransactionNonKeyed",
+                LogChangelogMode.ALL,
+                LogConsistency.TRANSACTIONAL,
+                false);
+    }
+
+    @Test
+    public void testUpsertTransactionNonKeyed() {
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () ->
+                                innerTest(
+                                        "UpsertTransactionNonKeyed",
+                                        LogChangelogMode.UPSERT,
+                                        LogConsistency.TRANSACTIONAL,
+                                        false));
+        assertThat(exception.getMessage())
+                .isEqualTo("Can not use upsert changelog mode for non-pk table.");
+    }
+
+    @Test
+    public void testUpsertEventualNonKeyed() {
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () ->
+                                innerTest(
+                                        "UpsertEventualNonKeyed",
+                                        LogChangelogMode.UPSERT,
+                                        LogConsistency.EVENTUAL,
+                                        false));
+        assertThat(exception.getMessage())
+                .isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
+    }
+
+    @Test
+    public void testAllEventualNonKeyed() {
+        IllegalArgumentException exception =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () ->
+                                innerTest(
+                                        "AllEventualNonKeyed",
+                                        LogChangelogMode.ALL,
+                                        LogConsistency.EVENTUAL,
+                                        false));
+        assertThat(exception.getMessage())
+                .isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
+    }
+
+    private void innerTest(
+            String name, LogChangelogMode changelogMode, LogConsistency consistency, boolean keyed)
+            throws Exception {
+        Context context =
+                testContext(name, getBootstrapServers(), changelogMode, consistency, keyed);
+
+        KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
+        KafkaLogSourceProvider sourceProvider =
+                factory.createSourceProvider(context, SOURCE_CONTEXT);
+
+        factory.onCreateTable(context, 3);
+        try {
+            // transactional need to commit
+            enableCheckpoint();
+
+            // 1.1 sink
+            String uuid = UUID.randomUUID().toString();
+            env.fromElements(
+                            testRecord(true, 2, 1, 2, RowKind.DELETE),
+                            testRecord(true, 1, 3, 4, RowKind.INSERT),
+                            testRecord(true, 0, 5, 6, RowKind.INSERT),
+                            testRecord(true, 0, 7, 8, RowKind.INSERT))
+                    .sinkTo(new TestOffsetsLogSink<>(sinkProvider, uuid));
+            env.execute();
+
+            // 1.2 read
+            List<RowData> records =
+                    env.fromSource(
+                                    sourceProvider.createSource(null),
+                                    WatermarkStrategy.noWatermarks(),
+                                    "source")
+                            .executeAndCollect(4);
+            records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+
+            // delete, upsert mode
+            if (changelogMode == LogChangelogMode.UPSERT) {
+                assertRow(records.get(0), RowKind.DELETE, 1, null);
+            } else {
+                assertRow(records.get(0), RowKind.DELETE, 1, 2);
+            }
+
+            // inserts
+            assertRow(records.get(1), RowKind.INSERT, 3, 4);
+            assertRow(records.get(2), RowKind.INSERT, 5, 6);
+            assertRow(records.get(3), RowKind.INSERT, 7, 8);
+
+            // 2.1 sink
+            env.fromElements(
+                            testRecord(true, 0, 9, 10, RowKind.INSERT),
+                            testRecord(true, 1, 11, 12, RowKind.INSERT),
+                            testRecord(true, 2, 13, 14, RowKind.INSERT))
+                    .sinkTo(new TestOffsetsLogSink<>(sinkProvider, UUID.randomUUID().toString()));
+            env.execute();
+
+            // 2.2 read from offsets
+            records =
+                    env.fromSource(
+                                    sourceProvider.createSource(
+                                            TestOffsetsLogSink.drainOffsets(uuid)),
+                                    WatermarkStrategy.noWatermarks(),
+                                    "source")
+                            .executeAndCollect(3);
+            records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+            assertRow(records.get(0), RowKind.INSERT, 9, 10);
+            assertRow(records.get(1), RowKind.INSERT, 11, 12);
+            assertRow(records.get(2), RowKind.INSERT, 13, 14);
+        } finally {
+            factory.onDropTable(context);
+        }
+    }
+
+    private void enableCheckpoint() {
+        Configuration configuration = new Configuration();
+        configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        env.configure(configuration);
+        env.enableCheckpointing(1000);
+    }
+
+    private void assertRow(RowData row, RowKind rowKind, Integer k, Integer v) {
+        Assert.assertEquals(rowKind, row.getRowKind());
+        Assert.assertEquals(k, row.isNullAt(0) ? null : row.getInt(0));
+        Assert.assertEquals(v, row.isNullAt(1) ? null : row.getInt(1));
+    }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
new file mode 100644
index 0000000..c8461a0
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testContext;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KafkaLogSerializationSchema} and {@link KafkaLogKeyedDeserializationSchema}. */
+public class KafkaLogSerializationTest {
+
+    private static final String TOPIC = "my_topic";
+
+    @Test
+    public void testKeyed() throws Exception {
+        checkKeyed(LogChangelogMode.AUTO, 1, 3, 5);
+        checkKeyed(LogChangelogMode.UPSERT, 3, 6, 9);
+        checkKeyed(LogChangelogMode.ALL, 2, 5, 3);
+    }
+
+    @Test
+    public void testNonKeyedUpsert() {
+        Assertions.assertThrows(
+                IllegalArgumentException.class,
+                () -> checkNonKeyed(LogChangelogMode.UPSERT, 3, 6, 9));
+    }
+
+    @Test
+    public void testNonKeyed() throws Exception {
+        checkNonKeyed(LogChangelogMode.AUTO, 1, 3, 5);
+        checkNonKeyed(LogChangelogMode.ALL, 2, 5, 3);
+    }
+
+    private void checkKeyed(LogChangelogMode mode, int bucket, int key, int value)
+            throws Exception {
+        check(mode, true, bucket, key, value, RowKind.INSERT);
+        check(mode, true, bucket, key, value, RowKind.UPDATE_BEFORE);
+        check(mode, true, bucket, key, value, RowKind.UPDATE_AFTER);
+        check(mode, true, bucket, key, value, RowKind.DELETE);
+    }
+
+    private void checkNonKeyed(LogChangelogMode mode, int bucket, int key, int value)
+            throws Exception {
+        check(mode, false, bucket, key, value, RowKind.INSERT);
+        check(mode, false, bucket, key, value, RowKind.UPDATE_BEFORE);
+        check(mode, false, bucket, key, value, RowKind.UPDATE_AFTER);
+        check(mode, false, bucket, key, value, RowKind.DELETE);
+    }
+
+    private void check(
+            LogChangelogMode mode, boolean keyed, int bucket, int key, int value, RowKind rowKind)
+            throws Exception {
+        KafkaLogSerializationSchema serializer =
+                createTestSerializationSchema(testContext("", mode, keyed));
+        serializer.open(null, null);
+        KafkaRecordDeserializationSchema<RowData> deserializer =
+                createTestDeserializationSchema(testContext("", mode, keyed));
+        deserializer.open(null);
+
+        SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
+        ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null, null);
+
+        assertThat(record.partition().intValue()).isEqualTo(bucket);
+
+        AtomicReference<RowData> rowReference = new AtomicReference<>();
+        deserializer.deserialize(
+                toConsumerRecord(record),
+                new Collector<RowData>() {
+                    @Override
+                    public void collect(RowData record) {
+                        if (rowReference.get() != null) {
+                            throw new RuntimeException();
+                        }
+                        rowReference.set(record);
+                    }
+
+                    @Override
+                    public void close() {}
+                });
+        RowData row = rowReference.get();
+
+        if (rowKind == RowKind.UPDATE_BEFORE) {
+            assertThat(row.getRowKind()).isEqualTo(RowKind.DELETE);
+        } else if (rowKind == RowKind.UPDATE_AFTER) {
+            assertThat(row.getRowKind()).isEqualTo(RowKind.INSERT);
+        } else {
+            assertThat(row.getRowKind()).isEqualTo(rowKind);
+        }
+        assertThat(row.getInt(0)).isEqualTo(key);
+        if (row.getRowKind() == RowKind.INSERT || mode == LogChangelogMode.ALL || !keyed) {
+            assertThat(row.getInt(1)).isEqualTo(value);
+        } else {
+            assertThat(row.isNullAt(1)).isTrue();
+        }
+    }
+
+    private ConsumerRecord<byte[], byte[]> toConsumerRecord(ProducerRecord<byte[], byte[]> record) {
+        return new ConsumerRecord<>(TOPIC, record.partition(), 0, record.key(), record.value());
+    }
+
+    private static KafkaLogSerializationSchema createTestSerializationSchema(
+            DynamicTableFactory.Context context) {
+        return discoverKafkaLogFactory()
+                .createSinkProvider(context, KafkaLogTestUtils.SINK_CONTEXT)
+                .createSerializationSchema();
+    }
+
+    private static KafkaRecordDeserializationSchema<RowData> createTestDeserializationSchema(
+            DynamicTableFactory.Context context) {
+        return discoverKafkaLogFactory()
+                .createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT)
+                .createDeserializationSchema();
+    }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
new file mode 100644
index 0000000..0fad212
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
+
+/** Utils for the test of {@link KafkaLogStoreFactory}. */
+public class KafkaLogTestUtils {
+
+    static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
+            new LogStoreTableFactory.SourceContext() {
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
+                    return createTypeInformation(
+                            TypeConversions.fromDataToLogicalType(producedDataType));
+                }
+
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(
+                        LogicalType producedLogicalType) {
+                    return InternalTypeInfo.of(producedLogicalType);
+                }
+
+                @Override
+                public DynamicTableSource.DataStructureConverter createDataStructureConverter(
+                        DataType producedDataType) {
+                    return ScanRuntimeProviderContext.INSTANCE.createDataStructureConverter(
+                            producedDataType);
+                }
+            };
+
+    static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
+            new LogStoreTableFactory.SinkContext() {
+
+                @Override
+                public boolean isBounded() {
+                    return false;
+                }
+
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
+                    return createTypeInformation(
+                            TypeConversions.fromDataToLogicalType(producedDataType));
+                }
+
+                @Override
+                public <T> TypeInformation<T> createTypeInformation(
+                        LogicalType producedLogicalType) {
+                    return InternalTypeInfo.of(producedLogicalType);
+                }
+
+                @Override
+                public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+                        DataType producedDataType) {
+                    return new SinkRuntimeProviderContext(isBounded())
+                            .createDataStructureConverter(producedDataType);
+                }
+            };
+
+    static KafkaLogStoreFactory discoverKafkaLogFactory() {
+        return (KafkaLogStoreFactory)
+                LogStoreTableFactory.discoverLogStoreFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        KafkaLogStoreFactory.IDENTIFIER);
+    }
+
+    private static DynamicTableFactory.Context createContext(
+            String name, RowType rowType, int[] pk, Map<String, String> options) {
+        return new FactoryUtil.DefaultDynamicTableContext(
+                ObjectIdentifier.of("catalog", "database", name),
+                KafkaLogTestUtils.createResolvedTable(options, rowType, pk),
+                Collections.emptyMap(),
+                new Configuration(),
+                Thread.currentThread().getContextClassLoader(),
+                false);
+    }
+
+    static ResolvedCatalogTable createResolvedTable(
+            Map<String, String> options, RowType rowType, int[] pk) {
+        List<String> fieldNames = rowType.getFieldNames();
+        List<DataType> fieldDataTypes =
+                rowType.getChildren().stream()
+                        .map(TypeConversions::fromLogicalToDataType)
+                        .collect(Collectors.toList());
+        CatalogTable origin =
+                CatalogTable.of(
+                        Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(),
+                        null,
+                        Collections.emptyList(),
+                        options);
+        List<Column> resolvedColumns =
+                IntStream.range(0, fieldNames.size())
+                        .mapToObj(i -> Column.physical(fieldNames.get(i), fieldDataTypes.get(i)))
+                        .collect(Collectors.toList());
+        UniqueConstraint constraint = null;
+        if (pk.length > 0) {
+            List<String> pkNames =
+                    Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
+            constraint = UniqueConstraint.primaryKey("pk", pkNames);
+        }
+        return new ResolvedCatalogTable(
+                origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
+    }
+
+    static DynamicTableFactory.Context testContext(
+            String servers, LogChangelogMode changelogMode, boolean keyed) {
+        return testContext("table", servers, changelogMode, LogConsistency.TRANSACTIONAL, keyed);
+    }
+
+    static DynamicTableFactory.Context testContext(
+            String name,
+            String servers,
+            LogChangelogMode changelogMode,
+            LogConsistency consistency,
+            boolean keyed) {
+        Map<String, String> options = new HashMap<>();
+        options.put(CHANGELOG_MODE.key(), changelogMode.toString());
+        options.put(CONSISTENCY.key(), consistency.toString());
+        options.put(BOOTSTRAP_SERVERS.key(), servers);
+        return createContext(
+                name,
+                RowType.of(new IntType(), new IntType()),
+                keyed ? new int[] {0} : new int[0],
+                options);
+    }
+
+    static SinkRecord testRecord(boolean keyed, int bucket, int key, int value, RowKind rowKind) {
+        return new SinkRecord(
+                EMPTY_ROW,
+                bucket,
+                keyed ? row(key) : EMPTY_ROW,
+                GenericRowData.ofKind(rowKind, key, value));
+    }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
new file mode 100644
index 0000000..dbe7897
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+
+/** Base class for Kafka Table IT Cases. */
+public abstract class KafkaTableTestBase extends AbstractTestBase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class);
+
+    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+    private static final Network NETWORK = Network.newNetwork();
+    private static final int zkTimeoutMills = 30000;
+
+    @ClassRule
+    public static final KafkaContainer KAFKA_CONTAINER =
+            new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) {
+                @Override
+                protected void doStart() {
+                    super.doStart();
+                    if (LOG.isInfoEnabled()) {
+                        this.followOutput(new Slf4jLogConsumer(LOG));
+                    }
+                }
+            }.withEmbeddedZookeeper()
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
+                    .withEnv(
+                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+                            String.valueOf(Duration.ofHours(2).toMillis()))
+                    // Disable log deletion to prevent records from being deleted during test run
+                    .withEnv("KAFKA_LOG_RETENTION_MS", "-1");
+
+    protected StreamExecutionEnvironment env;
+    protected StreamTableEnvironment tEnv;
+
+    // Timer for scheduling logging task if the test hangs
+    private final Timer loggingTimer = new Timer("Debug Logging Timer");
+
+    @Before
+    public void setup() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+
+        // Probe Kafka broker status per 30 seconds
+        scheduleTimeoutLogger(
+                Duration.ofSeconds(30),
+                () -> {
+                    // List all non-internal topics
+                    final Map<String, TopicDescription> topicDescriptions =
+                            describeExternalTopics();
+                    LOG.info("Current existing topics: {}", topicDescriptions.keySet());
+
+                    // Log status of topics
+                    logTopicPartitionStatus(topicDescriptions);
+                });
+    }
+
+    @After
+    public void after() {
+        // Cancel timer for debug logging
+        cancelTimeoutLogger();
+    }
+
+    public Properties getStandardProps() {
+        Properties standardProps = new Properties();
+        standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
+        standardProps.put("group.id", "flink-tests");
+        standardProps.put("enable.auto.commit", false);
+        standardProps.put("auto.offset.reset", "earliest");
+        standardProps.put("max.partition.fetch.bytes", 256);
+        standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills);
+        standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills);
+        return standardProps;
+    }
+
+    public String getBootstrapServers() {
+        return KAFKA_CONTAINER.getBootstrapServers();
+    }
+
+    // ------------------------ For Debug Logging Purpose ----------------------------------
+
+    private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) {
+        TimerTask timeoutLoggerTask =
+                new TimerTask() {
+                    @Override
+                    public void run() {
+                        try {
+                            loggingAction.run();
+                        } catch (Exception e) {
+                            throw new RuntimeException("Failed to execute logging action", e);
+                        }
+                    }
+                };
+        loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis());
+    }
+
+    private void cancelTimeoutLogger() {
+        loggingTimer.cancel();
+    }
+
+    private Map<String, TopicDescription> describeExternalTopics() {
+        try (final AdminClient adminClient = AdminClient.create(getStandardProps())) {
+            final List<String> topics =
+                    adminClient.listTopics().listings().get().stream()
+                            .filter(listing -> !listing.isInternal())
+                            .map(TopicListing::name)
+                            .collect(Collectors.toList());
+
+            return adminClient.describeTopics(topics).all().get();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to list Kafka topics", e);
+        }
+    }
+
+    private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescriptions) {
+        final Properties properties = getStandardProps();
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging");
+        properties.setProperty(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        properties.setProperty(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                StringDeserializer.class.getCanonicalName());
+        final KafkaConsumer<?, ?> consumer = new KafkaConsumer<String, String>(properties);
+        List<TopicPartition> partitions = new ArrayList<>();
+        topicDescriptions.forEach(
+                (topic, description) ->
+                        description
+                                .partitions()
+                                .forEach(
+                                        tpInfo ->
+                                                partitions.add(
+                                                        new TopicPartition(
+                                                                topic, tpInfo.partition()))));
+        final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
+        final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
+        partitions.forEach(
+                partition ->
+                        LOG.info(
+                                "TopicPartition \"{}\": starting offset: {}, stopping offset: {}",
+                                partition,
+                                beginningOffsets.get(partition),
+                                endOffsets.get(partition)));
+    }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
new file mode 100644
index 0000000..e06d527
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.store.log.LogInitContext;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.sink.SinkRecord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+/** Test kafka {@link Sink}. */
+public class TestOffsetsLogSink<
+                WriterT extends
+                        StatefulSinkWriter<SinkRecord, WriterStateT>
+                                & PrecommittingSinkWriter<SinkRecord, CommT>,
+                CommT,
+                WriterStateT>
+        implements StatefulSink<SinkRecord, WriterStateT>,
+                TwoPhaseCommittingSink<SinkRecord, CommT> {
+
+    private static final Map<String, Map<Integer, Long>> GLOBAL_OFFSETS = new ConcurrentHashMap<>();
+
+    private final LogSinkProvider sinkProvider;
+    private final String uuid;
+    private final Sink<SinkRecord> sink;
+
+    public TestOffsetsLogSink(LogSinkProvider sinkProvider, String uuid) {
+        this.sinkProvider = sinkProvider;
+        this.uuid = uuid;
+        this.sink = sinkProvider.createSink();
+    }
+
+    public static Map<Integer, Long> drainOffsets(String uuid) {
+        return GLOBAL_OFFSETS.remove(uuid);
+    }
+
+    @Override
+    public WriterT createWriter(InitContext initContext) throws IOException {
+        return (WriterT) sink.createWriter(wrapContext(initContext));
+    }
+
+    private InitContext wrapContext(InitContext initContext) {
+        Consumer<?> consumer =
+                sinkProvider.createMetadataConsumer(
+                        (bucket, offset) -> {
+                            Map<Integer, Long> offsets =
+                                    GLOBAL_OFFSETS.computeIfAbsent(
+                                            uuid, k -> new ConcurrentHashMap<>());
+                            long nextOffset = offset + 1;
+                            offsets.compute(
+                                    bucket,
+                                    (k, v) -> v == null ? nextOffset : Math.max(v, nextOffset));
+                        });
+        return new LogInitContext(initContext, consumer);
+    }
+
+    @Override
+    public StatefulSinkWriter<SinkRecord, WriterStateT> restoreWriter(
+            InitContext initContext, Collection<WriterStateT> collection) throws IOException {
+        return ((StatefulSink<SinkRecord, WriterStateT>) sink)
+                .restoreWriter(wrapContext(initContext), collection);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
+        return ((StatefulSink<SinkRecord, WriterStateT>) sink).getWriterStateSerializer();
+    }
+
+    @Override
+    public Committer<CommT> createCommitter() throws IOException {
+        return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).createCommitter();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
+        return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).getCommittableSerializer();
+    }
+}
diff --git a/flink-table-store-kafka/src/test/resources/log4j2-test.properties b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..863665c
--- /dev/null
+++ b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git a/pom.xml b/pom.xml
index 5046318..7d048b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@ under the License.
     <modules>
         <module>flink-table-store-core</module>
         <module>flink-table-store-connector</module>
+        <module>flink-table-store-kafka</module>
     </modules>
 
     <properties>
@@ -61,6 +62,7 @@ under the License.
         <scala.binary.version>2.12</scala.binary.version>
         <slf4j.version>1.7.15</slf4j.version>
         <log4j.version>2.17.1</log4j.version>
+        <junit4.version>4.13.2</junit4.version>
         <junit5.version>5.8.1</junit5.version>
         <spotless.version>2.4.2</spotless.version>
         <target.java.version>1.8</target.java.version>
@@ -68,6 +70,7 @@ under the License.
         <javax.annotation-api.version>1.3.2</javax.annotation-api.version>
         <flink.forkCount>1C</flink.forkCount>
         <flink.reuseForks>true</flink.reuseForks>
+        <testcontainers.version>1.16.2</testcontainers.version>
 
         <!-- Can be set to any value to reproduce a specific build. -->
         <test.randomization.seed/>