You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/18 10:12:18 UTC
[flink-table-store] branch master updated: [FLINK-26103] Introduce log store and implement Kafka log store
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 54ca3dc [FLINK-26103] Introduce log store and implement Kafka log store
54ca3dc is described below
commit 54ca3dc2f6ea3b40beb24ada3539f6db0bce989f
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Fri Feb 18 18:12:13 2022 +0800
[FLINK-26103] Introduce log store and implement Kafka log store
This closes #20
---
.../store/connector/sink/StoreSinkWriter.java | 2 +-
.../flink/table/store/log/LogInitContext.java | 87 +++++++++
.../apache/flink/table/store/log/LogOptions.java | 192 ++++++++++++++++++
.../flink/table/store/log/LogSinkProvider.java | 52 +++++
.../flink/table/store/log/LogSourceProvider.java | 43 ++++
.../table/store/log/LogStoreTableFactory.java | 132 +++++++++++++
.../apache/flink/table/store/sink/SinkRecord.java | 13 +-
.../table/store/sink/SinkRecordConverter.java | 27 ++-
.../flink/table/store/utils/OptionsUtils.java | 32 +++
flink-table-store-kafka/pom.xml | 160 +++++++++++++++
.../kafka/KafkaLogKeyedDeserializationSchema.java | 106 ++++++++++
.../flink/table/store/kafka/KafkaLogOptions.java | 32 +++
.../store/kafka/KafkaLogSerializationSchema.java | 86 ++++++++
.../table/store/kafka/KafkaLogSinkProvider.java | 106 ++++++++++
.../table/store/kafka/KafkaLogSourceProvider.java | 144 ++++++++++++++
.../table/store/kafka/KafkaLogStoreFactory.java | 208 ++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 16 ++
.../flink/table/store/kafka/KafkaLogITCase.java | 217 +++++++++++++++++++++
.../store/kafka/KafkaLogSerializationTest.java | 146 ++++++++++++++
.../flink/table/store/kafka/KafkaLogTestUtils.java | 190 ++++++++++++++++++
.../table/store/kafka/KafkaTableTestBase.java | 194 ++++++++++++++++++
.../table/store/kafka/TestOffsetsLogSink.java | 105 ++++++++++
.../src/test/resources/log4j2-test.properties | 38 ++++
pom.xml | 3 +
24 files changed, 2312 insertions(+), 19 deletions(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
index 8cc88a1..dc8c993 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkWriter.java
@@ -97,7 +97,7 @@ public class StoreSinkWriter<WriterStateT>
}
private void writeToFileStore(RecordWriter writer, SinkRecord record) throws Exception {
- switch (record.rowKind()) {
+ switch (record.row().getRowKind()) {
case INSERT:
case UPDATE_AFTER:
if (record.key().getArity() == 0) {
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
new file mode 100644
index 0000000..a5983a9
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogInitContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.function.Consumer;
+
+/** A {@link InitContext} with {@link InitContext#metadataConsumer()}. */
+public class LogInitContext implements InitContext {
+
+ private final InitContext context;
+ private final Consumer<?> metaConsumer;
+
+ public LogInitContext(InitContext context, Consumer<?> metaConsumer) {
+ this.context = context;
+ this.metaConsumer = metaConsumer;
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ return context.getUserCodeClassLoader();
+ }
+
+ @Override
+ public MailboxExecutor getMailboxExecutor() {
+ return context.getMailboxExecutor();
+ }
+
+ @Override
+ public ProcessingTimeService getProcessingTimeService() {
+ return context.getProcessingTimeService();
+ }
+
+ @Override
+ public int getSubtaskId() {
+ return context.getSubtaskId();
+ }
+
+ @Override
+ public int getNumberOfParallelSubtasks() {
+ return context.getNumberOfParallelSubtasks();
+ }
+
+ @Override
+ public SinkWriterMetricGroup metricGroup() {
+ return context.metricGroup();
+ }
+
+ @Override
+ public OptionalLong getRestoredCheckpointId() {
+ return context.getRestoredCheckpointId();
+ }
+
+ @Override
+ public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
+ return context.asSerializationSchemaInitializationContext();
+ }
+
+ @Override
+ public Optional<Consumer<?>> metadataConsumer() {
+ return Optional.of(metaConsumer);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
new file mode 100644
index 0000000..3f36945
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogOptions.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
+
+/** Options for log store. */
+public class LogOptions {
+
+ public static final ConfigOption<LogStartupMode> SCAN =
+ ConfigOptions.key("scan")
+ .enumType(LogStartupMode.class)
+ .defaultValue(LogStartupMode.FULL)
+ .withDescription(
+ Description.builder()
+ .text("Specify the startup mode for log consumer.")
+ .linebreak()
+ .list(formatEnumOption(LogStartupMode.FULL))
+ .list(formatEnumOption(LogStartupMode.LATEST))
+ .list(formatEnumOption(LogStartupMode.FROM_TIMESTAMP))
+ .build());
+
+ public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLS =
+ ConfigOptions.key("scan.timestamp-millis")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional timestamp used in case of \"from-timestamp\" scan mode");
+
+ public static final ConfigOption<Duration> RETENTION =
+ ConfigOptions.key("retention")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "It means how long changes log will be kept. The default value is from the log system cluster.");
+
+ public static final ConfigOption<LogConsistency> CONSISTENCY =
+ ConfigOptions.key("consistency")
+ .enumType(LogConsistency.class)
+ .defaultValue(LogConsistency.TRANSACTIONAL)
+ .withDescription(
+ Description.builder()
+ .text("Specify the log consistency mode for table.")
+ .linebreak()
+ .list(
+ formatEnumOption(LogConsistency.TRANSACTIONAL),
+ formatEnumOption(LogConsistency.EVENTUAL))
+ .build());
+
+ public static final ConfigOption<LogChangelogMode> CHANGELOG_MODE =
+ ConfigOptions.key("changelog-mode")
+ .enumType(LogChangelogMode.class)
+ .defaultValue(LogChangelogMode.AUTO)
+ .withDescription(
+ Description.builder()
+ .text("Specify the log changelog mode for table.")
+ .linebreak()
+ .list(
+ formatEnumOption(LogChangelogMode.AUTO),
+ formatEnumOption(LogChangelogMode.ALL),
+ formatEnumOption(LogChangelogMode.UPSERT))
+ .build());
+
+ public static final ConfigOption<String> KEY_FORMAT =
+ ConfigOptions.key("key.format")
+ .stringType()
+ .defaultValue("json")
+ .withDescription(
+ "Specify the key message format of log system with primary key.");
+
+ public static final ConfigOption<String> FORMAT =
+ ConfigOptions.key("format")
+ .stringType()
+ .defaultValue("debezium-json")
+ .withDescription("Specify the message format of log system.");
+
+ /** Specifies the startup mode for log consumer. */
+ public enum LogStartupMode implements DescribedEnum {
+ FULL(
+ "full",
+ "Perform a snapshot on the table upon first startup,"
+ + " and continue to read the latest changes."),
+
+ LATEST("latest", "Start from the latest."),
+
+ FROM_TIMESTAMP("from-timestamp", "Start from user-supplied timestamp.");
+
+ private final String value;
+ private final String description;
+
+ LogStartupMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
+ /** Specifies the log consistency mode for table. */
+ public enum LogConsistency implements DescribedEnum {
+ TRANSACTIONAL(
+ "transactional",
+ "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
+
+ EVENTUAL(
+ "eventual",
+ "Immediate data visibility, you may see some intermediate states, "
+ + "but eventually the right results will be produced, only works for table with primary key.");
+
+ private final String value;
+ private final String description;
+
+ LogConsistency(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+
+ /** Specifies the log changelog mode for table. */
+ public enum LogChangelogMode implements DescribedEnum {
+ AUTO("auto", "Upsert for table with primary key, all for table without primary key.."),
+
+ ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
+
+ UPSERT(
+ "upsert",
+ "The log system does not store the UPDATE_BEFORE changes, the log consumed job"
+ + " will automatically add the normalized node, relying on the state"
+ + " to generate the required update_before.");
+
+ private final String value;
+ private final String description;
+
+ LogChangelogMode(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
new file mode 100644
index 0000000..8a091fa
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSinkProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.table.store.sink.SinkRecord;
+
+import java.io.Serializable;
+import java.util.function.Consumer;
+
+/** A {@link Serializable} sink provider for log store. */
+public interface LogSinkProvider extends Serializable {
+
+ /** Creates a {@link Sink} instance. */
+ Sink<SinkRecord> createSink();
+
+ /**
+ * Create a metadata consumer for {@link Sink.InitContext#metadataConsumer()} from {@link
+ * WriteCallback}.
+ */
+ Consumer<?> createMetadataConsumer(WriteCallback callback);
+
+ /**
+ * A callback interface that the user can implement to know the offset of the bucket when the
+ * request is complete.
+ */
+ interface WriteCallback {
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of request
+ * completion. This method will be called when the record sent to the server has been
+ * acknowledged.
+ */
+ void onCompletion(int bucket, long offset);
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSourceProvider.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSourceProvider.java
new file mode 100644
index 0000000..94dd604
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogSourceProvider.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.base.source.hybrid.HybridSource;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A {@link Serializable} source provider for log store.
+ *
+ * <p>This class is serializable, it can be wrapped as the {@link HybridSource.SourceFactory}.
+ */
+public interface LogSourceProvider extends Serializable {
+
+ /**
+ * Creates a {@link Source} instance.
+ *
+ * @param bucketOffsets optional, configure if you need to specify the startup offset.
+ */
+ Source<RowData, ?, ?> createSource(@Nullable Map<Integer, Long> bucketOffsets);
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
new file mode 100644
index 0000000..35c435d
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/log/LogStoreTableFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.log;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Base interface for configuring a default log table connector. The log table is used by managed
+ * table factory.
+ *
+ * <p>Log tables are for processing only unbounded data. Support streaming reading and streaming
+ * writing.
+ */
+public interface LogStoreTableFactory extends DynamicTableFactory {
+
+ /** Notifies the listener that a table creation occurred. */
+ void onCreateTable(Context context, int numBucket);
+
+ /** Notifies the listener that a table drop occurred. */
+ void onDropTable(Context context);
+
+ /**
+ * Creates a {@link LogSourceProvider} instance from a {@link CatalogTable} and additional
+ * context information.
+ */
+ LogSourceProvider createSourceProvider(Context context, SourceContext sourceContext);
+
+ /**
+ * Creates a {@link LogSinkProvider} instance from a {@link CatalogTable} and additional context
+ * information.
+ */
+ LogSinkProvider createSinkProvider(Context context, SinkContext sinkContext);
+
+ /** Context for create runtime source. */
+ interface SourceContext extends DynamicTableSource.Context {}
+
+ /** Context for create runtime sink. */
+ interface SinkContext extends DynamicTableSink.Context {}
+
+ // --------------------------------------------------------------------------------------------
+
+ static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String identifier) {
+ return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class, identifier);
+ }
+
+ static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
+ TableFactoryHelper helper) {
+ DecodingFormat<DeserializationSchema<RowData>> format =
+ helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class, LogOptions.KEY_FORMAT);
+ validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+ return format;
+ }
+
+ static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(
+ TableFactoryHelper helper) {
+ EncodingFormat<SerializationSchema<RowData>> format =
+ helper.discoverEncodingFormat(
+ SerializationFormatFactory.class, LogOptions.KEY_FORMAT);
+ validateKeyFormat(format, helper.getOptions().get(LogOptions.KEY_FORMAT));
+ return format;
+ }
+
+ static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat(
+ TableFactoryHelper helper) {
+ DecodingFormat<DeserializationSchema<RowData>> format =
+ helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class, LogOptions.FORMAT);
+ validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+ return format;
+ }
+
+ static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
+ TableFactoryHelper helper) {
+ EncodingFormat<SerializationSchema<RowData>> format =
+ helper.discoverEncodingFormat(SerializationFormatFactory.class, LogOptions.FORMAT);
+ validateValueFormat(format, helper.getOptions().get(LogOptions.FORMAT));
+ return format;
+ }
+
+ static void validateKeyFormat(Format format, String name) {
+ if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "A key format should only deal with INSERT-only records. "
+ + "But %s has a changelog mode of %s.",
+ name, format.getChangelogMode()));
+ }
+ }
+
+ static void validateValueFormat(Format format, String name) {
+ if (!format.getChangelogMode().equals(ChangelogMode.all())) {
+ throw new ValidationException(
+ String.format(
+ "A value format should deal with all records. "
+ + "But %s has a changelog mode of %s.",
+ name, format.getChangelogMode()));
+ }
+ }
+}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
index 787e457..25b08c8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecord.java
@@ -24,27 +24,22 @@ import org.apache.flink.types.RowKind;
import static org.apache.flink.util.Preconditions.checkArgument;
-/** A sink records contains key, value and partition, bucket, row kind information. */
+/** A sink record contains key, row and partition, bucket information. */
public class SinkRecord {
private final BinaryRowData partition;
private final int bucket;
- private final RowKind rowKind;
-
private final BinaryRowData key;
private final RowData row;
- public SinkRecord(
- BinaryRowData partition, int bucket, RowKind rowKind, BinaryRowData key, RowData row) {
+ public SinkRecord(BinaryRowData partition, int bucket, BinaryRowData key, RowData row) {
checkArgument(partition.getRowKind() == RowKind.INSERT);
checkArgument(key.getRowKind() == RowKind.INSERT);
- checkArgument(row.getRowKind() == RowKind.INSERT);
this.partition = partition;
this.bucket = bucket;
- this.rowKind = rowKind;
this.key = key;
this.row = row;
}
@@ -64,8 +59,4 @@ public class SinkRecord {
public RowData row() {
return row;
}
-
- public RowKind rowKind() {
- return rowKind;
- }
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
index 2f17fb9..55c1bf8 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/sink/SinkRecordConverter.java
@@ -21,17 +21,18 @@ package org.apache.flink.table.store.sink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.Projection;
-import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.utils.ProjectionUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import java.util.stream.IntStream;
+
/** Converter for converting {@link RowData} to {@link SinkRecord}. */
public class SinkRecordConverter {
private final int numBucket;
- private final RowDataSerializer rowSerializer;
+ private final Projection<RowData, BinaryRowData> allProjection;
private final Projection<RowData, BinaryRowData> partProjection;
@@ -39,18 +40,30 @@ public class SinkRecordConverter {
public SinkRecordConverter(int numBucket, RowType inputType, int[] partitions, int[] keys) {
this.numBucket = numBucket;
- this.rowSerializer = new RowDataSerializer(inputType);
+ this.allProjection =
+ ProjectionUtils.newProjection(
+ inputType, IntStream.range(0, inputType.getFieldCount()).toArray());
this.partProjection = ProjectionUtils.newProjection(inputType, partitions);
this.keyProjection = ProjectionUtils.newProjection(inputType, keys);
}
public SinkRecord convert(RowData row) {
- RowKind rowKind = row.getRowKind();
- row.setRowKind(RowKind.INSERT);
BinaryRowData partition = partProjection.apply(row);
BinaryRowData key = keyProjection.apply(row);
- int hash = key.getArity() == 0 ? rowSerializer.toBinaryRow(row).hashCode() : key.hashCode();
+ int hash = key.getArity() == 0 ? hashRow(row) : key.hashCode();
int bucket = Math.abs(hash % numBucket);
- return new SinkRecord(partition, bucket, rowKind, key, row);
+ return new SinkRecord(partition, bucket, key, row);
+ }
+
+ private int hashRow(RowData row) {
+ if (row instanceof BinaryRowData) {
+ RowKind rowKind = row.getRowKind();
+ row.setRowKind(RowKind.INSERT);
+ int hash = row.hashCode();
+ row.setRowKind(rowKind);
+ return hash;
+ } else {
+ return allProjection.apply(row).hashCode();
+ }
}
}
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/utils/OptionsUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/utils/OptionsUtils.java
new file mode 100644
index 0000000..3436e3c
--- /dev/null
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/utils/OptionsUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.TextElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/** Utils for options. */
+public class OptionsUtils {
+
+ public static TextElement formatEnumOption(DescribedEnum describedEnum) {
+ return text("\"%s\": %s", text(describedEnum.toString()), describedEnum.getDescription());
+ }
+}
diff --git a/flink-table-store-kafka/pom.xml b/flink-table-store-kafka/pom.xml
new file mode 100644
index 0000000..75e6811
--- /dev/null
+++ b/flink-table-store-kafka/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>flink-table-store-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-table-store-kafka</artifactId>
+ <name>Flink Table Store : Kafka</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- flink dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <!-- include 2.0 server for tests -->
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>2.4.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit4.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
new file mode 100644
index 0000000..50485c6
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogKeyedDeserializationSchema.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.stream.IntStream;
+
+/** A {@link KafkaDeserializationSchema} for the table with primary key in log store. */
+public class KafkaLogKeyedDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeInformation<RowData> producedType;
+ private final int fieldCount;
+ private final int[] primaryKey;
+ private final DeserializationSchema<RowData> keyDeserializer;
+ private final DeserializationSchema<RowData> valueDeserializer;
+ private final RowData.FieldGetter[] keyFieldGetters;
+
+ public KafkaLogKeyedDeserializationSchema(
+ DataType physicalType,
+ int[] primaryKey,
+ DeserializationSchema<RowData> keyDeserializer,
+ DeserializationSchema<RowData> valueDeserializer) {
+ this.primaryKey = primaryKey;
+ this.keyDeserializer = keyDeserializer;
+ this.valueDeserializer = valueDeserializer;
+ this.producedType = InternalTypeInfo.of(physicalType.getLogicalType());
+ this.fieldCount = physicalType.getChildren().size();
+ this.keyFieldGetters =
+ IntStream.range(0, primaryKey.length)
+ .mapToObj(
+ i ->
+ RowData.createFieldGetter(
+ physicalType
+ .getChildren()
+ .get(primaryKey[i])
+ .getLogicalType(),
+ i))
+ .toArray(RowData.FieldGetter[]::new);
+ }
+
+ @Override
+ public void open(DeserializationSchema.InitializationContext context) throws Exception {
+ keyDeserializer.open(context);
+ valueDeserializer.open(context);
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public RowData deserialize(ConsumerRecord<byte[], byte[]> record) {
+ throw new RuntimeException(
+ "Please invoke DeserializationSchema#deserialize(byte[], Collector<RowData>) instead.");
+ }
+
+ @Override
+ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> out)
+ throws Exception {
+ if (record.value() == null) {
+ RowData key = keyDeserializer.deserialize(record.key());
+ GenericRowData value = new GenericRowData(RowKind.DELETE, fieldCount);
+ for (int i = 0; i < primaryKey.length; i++) {
+ value.setField(primaryKey[i], keyFieldGetters[i].getFieldOrNull(key));
+ }
+ out.collect(value);
+ } else {
+ valueDeserializer.deserialize(record.value(), out);
+ }
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedType;
+ }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
new file mode 100644
index 0000000..8882488
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for kafka log. */
+public class KafkaLogOptions {
+
+ public static final ConfigOption<String> BOOTSTRAP_SERVERS =
+ ConfigOptions.key("kafka.bootstrap.servers")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Required Kafka server connection string");
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
new file mode 100644
index 0000000..e9b4081
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSerializationSchema.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.types.RowKind;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/** A {@link KafkaRecordSerializationSchema} for the table in log store. */
+public class KafkaLogSerializationSchema implements KafkaRecordSerializationSchema<SinkRecord> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String topic;
+ @Nullable private final SerializationSchema<RowData> keySerializer;
+ private final SerializationSchema<RowData> valueSerializer;
+ private final LogChangelogMode changelogMode;
+
+ public KafkaLogSerializationSchema(
+ String topic,
+ @Nullable SerializationSchema<RowData> keySerializer,
+ SerializationSchema<RowData> valueSerializer,
+ LogChangelogMode changelogMode) {
+ this.topic = topic;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.changelogMode = changelogMode;
+ if (changelogMode == LogChangelogMode.UPSERT && keySerializer == null) {
+ throw new IllegalArgumentException(
+ "Can not use upsert changelog mode for non-pk table.");
+ }
+ }
+
+ @Override
+ public void open(
+ SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
+ throws Exception {
+ if (keySerializer != null) {
+ keySerializer.open(context);
+ }
+ valueSerializer.open(context);
+ }
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(
+ SinkRecord element, KafkaSinkContext context, Long timestamp) {
+ RowKind kind = element.row().getRowKind();
+
+ byte[] keyBytes = null;
+ byte[] valueBytes = null;
+ if (keySerializer != null) {
+ keyBytes = keySerializer.serialize(element.key());
+ if (changelogMode == LogChangelogMode.ALL
+ || kind == RowKind.INSERT
+ || kind == RowKind.UPDATE_AFTER) {
+ valueBytes = valueSerializer.serialize(element.row());
+ }
+ } else {
+ valueBytes = valueSerializer.serialize(element.row());
+ }
+ return new ProducerRecord<>(topic, element.bucket(), keyBytes, valueBytes);
+ }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
new file mode 100644
index 0000000..85b9cc0
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSinkProvider.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.sink.SinkRecord;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import javax.annotation.Nullable;
+
+import java.util.Properties;
+import java.util.function.Consumer;
+
+/** A Kafka {@link LogSinkProvider}. */
+public class KafkaLogSinkProvider implements LogSinkProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String topic;
+
+ private final Properties properties;
+
+ @Nullable private final SerializationSchema<RowData> keySerializer;
+
+ private final SerializationSchema<RowData> valueSerializer;
+
+ private final LogConsistency consistency;
+
+ private final LogChangelogMode changelogMode;
+
+ public KafkaLogSinkProvider(
+ String topic,
+ Properties properties,
+ @Nullable SerializationSchema<RowData> keySerializer,
+ SerializationSchema<RowData> valueSerializer,
+ LogConsistency consistency,
+ LogChangelogMode changelogMode) {
+ this.topic = topic;
+ this.properties = properties;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.consistency = consistency;
+ this.changelogMode = changelogMode;
+ }
+
+ @Override
+ public KafkaSink<SinkRecord> createSink() {
+ KafkaSinkBuilder<SinkRecord> builder = KafkaSink.builder();
+ switch (consistency) {
+ case TRANSACTIONAL:
+ builder.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
+ .setTransactionalIdPrefix("log-store-" + topic);
+ break;
+ case EVENTUAL:
+ if (keySerializer == null) {
+ throw new IllegalArgumentException(
+ "Can not use EVENTUAL consistency mode for non-pk table.");
+ }
+ builder.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE);
+ break;
+ }
+
+ return builder.setBootstrapServers(
+ properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
+ .setKafkaProducerConfig(properties)
+ .setRecordSerializer(createSerializationSchema())
+ .build();
+ }
+
+ @Override
+ public Consumer<RecordMetadata> createMetadataConsumer(WriteCallback callback) {
+ return meta -> callback.onCompletion(meta.partition(), meta.offset());
+ }
+
+ @VisibleForTesting
+ KafkaLogSerializationSchema createSerializationSchema() {
+ return new KafkaLogSerializationSchema(
+ topic, keySerializer, valueSerializer, changelogMode);
+ }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
new file mode 100644
index 0000000..68dbe63
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogSourceProvider.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
+import org.apache.flink.table.store.log.LogSourceProvider;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.kafka.common.TopicPartition;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+
+/** A Kafka {@link LogSourceProvider}. */
+public class KafkaLogSourceProvider implements LogSourceProvider {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String topic;
+
+ private final Properties properties;
+
+ private final DataType physicalType;
+
+ private final int[] primaryKey;
+
+ @Nullable private final DeserializationSchema<RowData> keyDeserializer;
+
+ private final DeserializationSchema<RowData> valueDeserializer;
+
+ private final LogConsistency consistency;
+
+ private final LogStartupMode scanMode;
+
+ @Nullable private final Long timestampMills;
+
+ public KafkaLogSourceProvider(
+ String topic,
+ Properties properties,
+ DataType physicalType,
+ int[] primaryKey,
+ @Nullable DeserializationSchema<RowData> keyDeserializer,
+ DeserializationSchema<RowData> valueDeserializer,
+ LogConsistency consistency,
+ LogStartupMode scanMode,
+ @Nullable Long timestampMills) {
+ this.topic = topic;
+ this.properties = properties;
+ this.physicalType = physicalType;
+ this.primaryKey = primaryKey;
+ this.keyDeserializer = keyDeserializer;
+ this.valueDeserializer = valueDeserializer;
+ this.consistency = consistency;
+ this.scanMode = scanMode;
+ this.timestampMills = timestampMills;
+ }
+
+ @Override
+ public KafkaSource<RowData> createSource(@Nullable Map<Integer, Long> bucketOffsets) {
+ switch (consistency) {
+ case TRANSACTIONAL:
+ // Add read committed for transactional consistency mode.
+ properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed");
+ break;
+ case EVENTUAL:
+ if (keyDeserializer == null) {
+ throw new IllegalArgumentException(
+ "Can not use EVENTUAL consistency mode for non-pk table.");
+ }
+ properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+ break;
+ }
+
+ return KafkaSource.<RowData>builder()
+ .setTopics(topic)
+ .setStartingOffsets(toOffsetsInitializer(bucketOffsets))
+ .setProperties(properties)
+ .setDeserializer(createDeserializationSchema())
+ .build();
+ }
+
+ @VisibleForTesting
+ KafkaRecordDeserializationSchema<RowData> createDeserializationSchema() {
+ return primaryKey.length > 0
+ ? KafkaRecordDeserializationSchema.of(
+ new KafkaLogKeyedDeserializationSchema(
+ physicalType, primaryKey, keyDeserializer, valueDeserializer))
+ : KafkaRecordDeserializationSchema.valueOnly(valueDeserializer);
+ }
+
+ private OffsetsInitializer toOffsetsInitializer(@Nullable Map<Integer, Long> bucketOffsets) {
+ switch (scanMode) {
+ case FULL:
+ return bucketOffsets == null
+ ? OffsetsInitializer.earliest()
+ : OffsetsInitializer.offsets(toKafkaOffsets(bucketOffsets));
+ case LATEST:
+ return OffsetsInitializer.latest();
+ case FROM_TIMESTAMP:
+ if (timestampMills == null) {
+ throw new NullPointerException(
+ "Must specify a timestamp if you choose timestamp startup mode.");
+ }
+ return OffsetsInitializer.timestamp(timestampMills);
+ default:
+ throw new UnsupportedOperationException("Unsupported mode: " + scanMode);
+ }
+ }
+
+ private Map<TopicPartition, Long> toKafkaOffsets(Map<Integer, Long> bucketOffsets) {
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ bucketOffsets.forEach(
+ (bucket, offset) -> offsets.put(new TopicPartition(topic, bucket), offset));
+ return offsets;
+ }
+}
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
new file mode 100644
index 0000000..9208940
--- /dev/null
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.FORMAT;
+import static org.apache.flink.table.store.log.LogOptions.KEY_FORMAT;
+import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import static org.apache.flink.table.store.log.LogOptions.RETENTION;
+import static org.apache.flink.table.store.log.LogOptions.SCAN;
+import static org.apache.flink.table.store.log.LogOptions.SCAN_TIMESTAMP_MILLS;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+
+/** The Kafka {@link LogStoreTableFactory} implementation. */
+public class KafkaLogStoreFactory implements LogStoreTableFactory {
+
+ public static final String IDENTIFIER = "kafka";
+
+ public static final String KAFKA_PREFIX = IDENTIFIER + ".";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(BOOTSTRAP_SERVERS);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(SCAN);
+ options.add(SCAN_TIMESTAMP_MILLS);
+ options.add(RETENTION);
+ options.add(CONSISTENCY);
+ options.add(CHANGELOG_MODE);
+ options.add(KEY_FORMAT);
+ options.add(FORMAT);
+ return options;
+ }
+
+ @Override
+ public void onCreateTable(DynamicTableFactory.Context context, int numBucket) {
+ FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+ helper.validateExcept(KAFKA_PREFIX);
+ try (AdminClient adminClient = AdminClient.create(toKafkaProperties(helper.getOptions()))) {
+ Map<String, String> configs = new HashMap<>();
+ helper.getOptions()
+ .getOptional(RETENTION)
+ .ifPresent(
+ retention ->
+ configs.put(
+ TopicConfig.RETENTION_MS_CONFIG,
+ String.valueOf(retention.toMillis())));
+
+ NewTopic topicObj =
+ new NewTopic(topic(context), Optional.of(numBucket), Optional.empty())
+ .configs(configs);
+ adminClient.createTopics(Collections.singleton(topicObj)).all().get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new TableException("Error in createTopic", e);
+ }
+ }
+
+ @Override
+ public void onDropTable(DynamicTableFactory.Context context) {
+ try (AdminClient adminClient =
+ AdminClient.create(
+ toKafkaProperties(createTableFactoryHelper(this, context).getOptions()))) {
+ adminClient.deleteTopics(Collections.singleton(topic(context))).all().get();
+ } catch (ExecutionException e) {
+ // ignore topic not exists
+ if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+ throw new TableException("Error in deleteTopic", e);
+ }
+ } catch (InterruptedException e) {
+ throw new TableException("Error in deleteTopic", e);
+ }
+ }
+
+ @Override
+ public KafkaLogSourceProvider createSourceProvider(
+ DynamicTableFactory.Context context, SourceContext sourceContext) {
+ FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+ ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+ DataType physicalType = schema.toPhysicalRowDataType();
+ DeserializationSchema<RowData> keyDeserializer = null;
+ int[] primaryKey = schema.getPrimaryKeyIndexes();
+ if (primaryKey.length > 0) {
+ DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
+ keyDeserializer =
+ LogStoreTableFactory.getKeyDecodingFormat(helper)
+ .createRuntimeDecoder(sourceContext, keyType);
+ }
+ DeserializationSchema<RowData> valueDeserializer =
+ LogStoreTableFactory.getValueDecodingFormat(helper)
+ .createRuntimeDecoder(sourceContext, physicalType);
+ return new KafkaLogSourceProvider(
+ topic(context),
+ toKafkaProperties(helper.getOptions()),
+ physicalType,
+ primaryKey,
+ keyDeserializer,
+ valueDeserializer,
+ helper.getOptions().get(CONSISTENCY),
+ helper.getOptions().get(SCAN),
+ helper.getOptions().get(SCAN_TIMESTAMP_MILLS));
+ }
+
+ @Override
+ public KafkaLogSinkProvider createSinkProvider(
+ DynamicTableFactory.Context context, SinkContext sinkContext) {
+ FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
+ ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+ DataType physicalType = schema.toPhysicalRowDataType();
+ SerializationSchema<RowData> keySerializer = null;
+ int[] primaryKey = schema.getPrimaryKeyIndexes();
+ if (primaryKey.length > 0) {
+ DataType keyType = DataTypeUtils.projectRow(physicalType, primaryKey);
+ keySerializer =
+ LogStoreTableFactory.getKeyEncodingFormat(helper)
+ .createRuntimeEncoder(sinkContext, keyType);
+ }
+ SerializationSchema<RowData> valueSerializer =
+ LogStoreTableFactory.getValueEncodingFormat(helper)
+ .createRuntimeEncoder(sinkContext, physicalType);
+ return new KafkaLogSinkProvider(
+ topic(context),
+ toKafkaProperties(helper.getOptions()),
+ keySerializer,
+ valueSerializer,
+ helper.getOptions().get(CONSISTENCY),
+ helper.getOptions().get(CHANGELOG_MODE));
+ }
+
+ private static String topic(DynamicTableFactory.Context context) {
+ return context.getObjectIdentifier().asSummaryString();
+ }
+
+ public static Properties toKafkaProperties(ReadableConfig options) {
+ Properties properties = new Properties();
+ Map<String, String> optionMap = ((Configuration) options).toMap();
+ optionMap.keySet().stream()
+ .filter(key -> key.startsWith(KAFKA_PREFIX))
+ .forEach(
+ key ->
+ properties.put(
+ key.substring((KAFKA_PREFIX).length()),
+ optionMap.get(key)));
+
+ // Add read committed for transactional consistency mode.
+ if (options.get(CONSISTENCY) == LogConsistency.TRANSACTIONAL) {
+ properties.setProperty(ISOLATION_LEVEL_CONFIG, "read_committed");
+ }
+ return properties;
+ }
+}
diff --git a/flink-table-store-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table-store-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..df0c6b8
--- /dev/null
+++ b/flink-table-store-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.kafka.KafkaLogStoreFactory
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
new file mode 100644
index 0000000..8b9dccf
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogITCase.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.log.LogOptions.LogConsistency;
+import org.apache.flink.types.RowKind;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SINK_CONTEXT;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.SOURCE_CONTEXT;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testContext;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link KafkaLogStoreFactory}. */
+public class KafkaLogITCase extends KafkaTableTestBase {
+
+ private final KafkaLogStoreFactory factory = discoverKafkaLogFactory();
+
+ @Test
+ public void testDropEmpty() {
+ // Expect no exceptions to be thrown
+ factory.onDropTable(testContext(getBootstrapServers(), LogChangelogMode.AUTO, true));
+ }
+
+ @Test
+ public void testUpsertTransactionKeyed() throws Exception {
+ innerTest(
+ "UpsertTransactionKeyed",
+ LogChangelogMode.UPSERT,
+ LogConsistency.TRANSACTIONAL,
+ true);
+ }
+
+ @Test
+ public void testAllTransactionKeyed() throws Exception {
+ innerTest("AllTransactionKeyed", LogChangelogMode.ALL, LogConsistency.TRANSACTIONAL, true);
+ }
+
+ @Test
+ public void testUpsertEventualKeyed() throws Exception {
+ innerTest("UpsertEventualKeyed", LogChangelogMode.UPSERT, LogConsistency.EVENTUAL, true);
+ }
+
+ @Test
+ public void testAllEventualKeyed() throws Exception {
+ innerTest("AllEventualKeyed", LogChangelogMode.ALL, LogConsistency.EVENTUAL, true);
+ }
+
+ @Test
+ public void testAllTransactionNonKeyed() throws Exception {
+ innerTest(
+ "AllTransactionNonKeyed",
+ LogChangelogMode.ALL,
+ LogConsistency.TRANSACTIONAL,
+ false);
+ }
+
+ @Test
+ public void testUpsertTransactionNonKeyed() {
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ innerTest(
+ "UpsertTransactionNonKeyed",
+ LogChangelogMode.UPSERT,
+ LogConsistency.TRANSACTIONAL,
+ false));
+ assertThat(exception.getMessage())
+ .isEqualTo("Can not use upsert changelog mode for non-pk table.");
+ }
+
+ @Test
+ public void testUpsertEventualNonKeyed() {
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ innerTest(
+ "UpsertEventualNonKeyed",
+ LogChangelogMode.UPSERT,
+ LogConsistency.EVENTUAL,
+ false));
+ assertThat(exception.getMessage())
+ .isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
+ }
+
+ @Test
+ public void testAllEventualNonKeyed() {
+ IllegalArgumentException exception =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ innerTest(
+ "AllEventualNonKeyed",
+ LogChangelogMode.ALL,
+ LogConsistency.EVENTUAL,
+ false));
+ assertThat(exception.getMessage())
+ .isEqualTo("Can not use EVENTUAL consistency mode for non-pk table.");
+ }
+
+ private void innerTest(
+ String name, LogChangelogMode changelogMode, LogConsistency consistency, boolean keyed)
+ throws Exception {
+ Context context =
+ testContext(name, getBootstrapServers(), changelogMode, consistency, keyed);
+
+ KafkaLogSinkProvider sinkProvider = factory.createSinkProvider(context, SINK_CONTEXT);
+ KafkaLogSourceProvider sourceProvider =
+ factory.createSourceProvider(context, SOURCE_CONTEXT);
+
+ factory.onCreateTable(context, 3);
+ try {
+ // transactional need to commit
+ enableCheckpoint();
+
+ // 1.1 sink
+ String uuid = UUID.randomUUID().toString();
+ env.fromElements(
+ testRecord(true, 2, 1, 2, RowKind.DELETE),
+ testRecord(true, 1, 3, 4, RowKind.INSERT),
+ testRecord(true, 0, 5, 6, RowKind.INSERT),
+ testRecord(true, 0, 7, 8, RowKind.INSERT))
+ .sinkTo(new TestOffsetsLogSink<>(sinkProvider, uuid));
+ env.execute();
+
+ // 1.2 read
+ List<RowData> records =
+ env.fromSource(
+ sourceProvider.createSource(null),
+ WatermarkStrategy.noWatermarks(),
+ "source")
+ .executeAndCollect(4);
+ records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+
+ // delete, upsert mode
+ if (changelogMode == LogChangelogMode.UPSERT) {
+ assertRow(records.get(0), RowKind.DELETE, 1, null);
+ } else {
+ assertRow(records.get(0), RowKind.DELETE, 1, 2);
+ }
+
+ // inserts
+ assertRow(records.get(1), RowKind.INSERT, 3, 4);
+ assertRow(records.get(2), RowKind.INSERT, 5, 6);
+ assertRow(records.get(3), RowKind.INSERT, 7, 8);
+
+ // 2.1 sink
+ env.fromElements(
+ testRecord(true, 0, 9, 10, RowKind.INSERT),
+ testRecord(true, 1, 11, 12, RowKind.INSERT),
+ testRecord(true, 2, 13, 14, RowKind.INSERT))
+ .sinkTo(new TestOffsetsLogSink<>(sinkProvider, UUID.randomUUID().toString()));
+ env.execute();
+
+ // 2.2 read from offsets
+ records =
+ env.fromSource(
+ sourceProvider.createSource(
+ TestOffsetsLogSink.drainOffsets(uuid)),
+ WatermarkStrategy.noWatermarks(),
+ "source")
+ .executeAndCollect(3);
+ records.sort(Comparator.comparingInt(o -> o.getInt(0)));
+ assertRow(records.get(0), RowKind.INSERT, 9, 10);
+ assertRow(records.get(1), RowKind.INSERT, 11, 12);
+ assertRow(records.get(2), RowKind.INSERT, 13, 14);
+ } finally {
+ factory.onDropTable(context);
+ }
+ }
+
+ private void enableCheckpoint() {
+ Configuration configuration = new Configuration();
+ configuration.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ env.configure(configuration);
+ env.enableCheckpointing(1000);
+ }
+
+ private void assertRow(RowData row, RowKind rowKind, Integer k, Integer v) {
+ Assert.assertEquals(rowKind, row.getRowKind());
+ Assert.assertEquals(k, row.isNullAt(0) ? null : row.getInt(0));
+ Assert.assertEquals(v, row.isNullAt(1) ? null : row.getInt(1));
+ }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
new file mode 100644
index 0000000..c8461a0
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogSerializationTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.discoverKafkaLogFactory;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testContext;
+import static org.apache.flink.table.store.kafka.KafkaLogTestUtils.testRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KafkaLogSerializationSchema} and {@link KafkaLogKeyedDeserializationSchema}. */
+public class KafkaLogSerializationTest {
+
+ private static final String TOPIC = "my_topic";
+
+ @Test
+ public void testKeyed() throws Exception {
+ checkKeyed(LogChangelogMode.AUTO, 1, 3, 5);
+ checkKeyed(LogChangelogMode.UPSERT, 3, 6, 9);
+ checkKeyed(LogChangelogMode.ALL, 2, 5, 3);
+ }
+
+ @Test
+ public void testNonKeyedUpsert() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> checkNonKeyed(LogChangelogMode.UPSERT, 3, 6, 9));
+ }
+
+ @Test
+ public void testNonKeyed() throws Exception {
+ checkNonKeyed(LogChangelogMode.AUTO, 1, 3, 5);
+ checkNonKeyed(LogChangelogMode.ALL, 2, 5, 3);
+ }
+
+ private void checkKeyed(LogChangelogMode mode, int bucket, int key, int value)
+ throws Exception {
+ check(mode, true, bucket, key, value, RowKind.INSERT);
+ check(mode, true, bucket, key, value, RowKind.UPDATE_BEFORE);
+ check(mode, true, bucket, key, value, RowKind.UPDATE_AFTER);
+ check(mode, true, bucket, key, value, RowKind.DELETE);
+ }
+
+ private void checkNonKeyed(LogChangelogMode mode, int bucket, int key, int value)
+ throws Exception {
+ check(mode, false, bucket, key, value, RowKind.INSERT);
+ check(mode, false, bucket, key, value, RowKind.UPDATE_BEFORE);
+ check(mode, false, bucket, key, value, RowKind.UPDATE_AFTER);
+ check(mode, false, bucket, key, value, RowKind.DELETE);
+ }
+
+ private void check(
+ LogChangelogMode mode, boolean keyed, int bucket, int key, int value, RowKind rowKind)
+ throws Exception {
+ KafkaLogSerializationSchema serializer =
+ createTestSerializationSchema(testContext("", mode, keyed));
+ serializer.open(null, null);
+ KafkaRecordDeserializationSchema<RowData> deserializer =
+ createTestDeserializationSchema(testContext("", mode, keyed));
+ deserializer.open(null);
+
+ SinkRecord input = testRecord(keyed, bucket, key, value, rowKind);
+ ProducerRecord<byte[], byte[]> record = serializer.serialize(input, null, null);
+
+ assertThat(record.partition().intValue()).isEqualTo(bucket);
+
+ AtomicReference<RowData> rowReference = new AtomicReference<>();
+ deserializer.deserialize(
+ toConsumerRecord(record),
+ new Collector<RowData>() {
+ @Override
+ public void collect(RowData record) {
+ if (rowReference.get() != null) {
+ throw new RuntimeException();
+ }
+ rowReference.set(record);
+ }
+
+ @Override
+ public void close() {}
+ });
+ RowData row = rowReference.get();
+
+ if (rowKind == RowKind.UPDATE_BEFORE) {
+ assertThat(row.getRowKind()).isEqualTo(RowKind.DELETE);
+ } else if (rowKind == RowKind.UPDATE_AFTER) {
+ assertThat(row.getRowKind()).isEqualTo(RowKind.INSERT);
+ } else {
+ assertThat(row.getRowKind()).isEqualTo(rowKind);
+ }
+ assertThat(row.getInt(0)).isEqualTo(key);
+ if (row.getRowKind() == RowKind.INSERT || mode == LogChangelogMode.ALL || !keyed) {
+ assertThat(row.getInt(1)).isEqualTo(value);
+ } else {
+ assertThat(row.isNullAt(1)).isTrue();
+ }
+ }
+
+ private ConsumerRecord<byte[], byte[]> toConsumerRecord(ProducerRecord<byte[], byte[]> record) {
+ return new ConsumerRecord<>(TOPIC, record.partition(), 0, record.key(), record.value());
+ }
+
+ private static KafkaLogSerializationSchema createTestSerializationSchema(
+ DynamicTableFactory.Context context) {
+ return discoverKafkaLogFactory()
+ .createSinkProvider(context, KafkaLogTestUtils.SINK_CONTEXT)
+ .createSerializationSchema();
+ }
+
+ private static KafkaRecordDeserializationSchema<RowData> createTestDeserializationSchema(
+ DynamicTableFactory.Context context) {
+ return discoverKafkaLogFactory()
+ .createSourceProvider(context, KafkaLogTestUtils.SOURCE_CONTEXT)
+ .createDeserializationSchema();
+ }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
new file mode 100644
index 0000000..0fad212
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.store.sink.SinkRecord;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
+import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
+import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
+import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode;
+import static org.apache.flink.table.store.log.LogOptions.LogConsistency;
+
+/** Utils for the test of {@link KafkaLogStoreFactory}. */
+public class KafkaLogTestUtils {
+
+ static final LogStoreTableFactory.SourceContext SOURCE_CONTEXT =
+ new LogStoreTableFactory.SourceContext() {
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
+ return createTypeInformation(
+ TypeConversions.fromDataToLogicalType(producedDataType));
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ LogicalType producedLogicalType) {
+ return InternalTypeInfo.of(producedLogicalType);
+ }
+
+ @Override
+ public DynamicTableSource.DataStructureConverter createDataStructureConverter(
+ DataType producedDataType) {
+ return ScanRuntimeProviderContext.INSTANCE.createDataStructureConverter(
+ producedDataType);
+ }
+ };
+
+ static final LogStoreTableFactory.SinkContext SINK_CONTEXT =
+ new LogStoreTableFactory.SinkContext() {
+
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(DataType producedDataType) {
+ return createTypeInformation(
+ TypeConversions.fromDataToLogicalType(producedDataType));
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(
+ LogicalType producedLogicalType) {
+ return InternalTypeInfo.of(producedLogicalType);
+ }
+
+ @Override
+ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
+ DataType producedDataType) {
+ return new SinkRuntimeProviderContext(isBounded())
+ .createDataStructureConverter(producedDataType);
+ }
+ };
+
+ static KafkaLogStoreFactory discoverKafkaLogFactory() {
+ return (KafkaLogStoreFactory)
+ LogStoreTableFactory.discoverLogStoreFactory(
+ Thread.currentThread().getContextClassLoader(),
+ KafkaLogStoreFactory.IDENTIFIER);
+ }
+
+ private static DynamicTableFactory.Context createContext(
+ String name, RowType rowType, int[] pk, Map<String, String> options) {
+ return new FactoryUtil.DefaultDynamicTableContext(
+ ObjectIdentifier.of("catalog", "database", name),
+ KafkaLogTestUtils.createResolvedTable(options, rowType, pk),
+ Collections.emptyMap(),
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ }
+
+ static ResolvedCatalogTable createResolvedTable(
+ Map<String, String> options, RowType rowType, int[] pk) {
+ List<String> fieldNames = rowType.getFieldNames();
+ List<DataType> fieldDataTypes =
+ rowType.getChildren().stream()
+ .map(TypeConversions::fromLogicalToDataType)
+ .collect(Collectors.toList());
+ CatalogTable origin =
+ CatalogTable.of(
+ Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(),
+ null,
+ Collections.emptyList(),
+ options);
+ List<Column> resolvedColumns =
+ IntStream.range(0, fieldNames.size())
+ .mapToObj(i -> Column.physical(fieldNames.get(i), fieldDataTypes.get(i)))
+ .collect(Collectors.toList());
+ UniqueConstraint constraint = null;
+ if (pk.length > 0) {
+ List<String> pkNames =
+ Arrays.stream(pk).mapToObj(fieldNames::get).collect(Collectors.toList());
+ constraint = UniqueConstraint.primaryKey("pk", pkNames);
+ }
+ return new ResolvedCatalogTable(
+ origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), constraint));
+ }
+
+ static DynamicTableFactory.Context testContext(
+ String servers, LogChangelogMode changelogMode, boolean keyed) {
+ return testContext("table", servers, changelogMode, LogConsistency.TRANSACTIONAL, keyed);
+ }
+
+ static DynamicTableFactory.Context testContext(
+ String name,
+ String servers,
+ LogChangelogMode changelogMode,
+ LogConsistency consistency,
+ boolean keyed) {
+ Map<String, String> options = new HashMap<>();
+ options.put(CHANGELOG_MODE.key(), changelogMode.toString());
+ options.put(CONSISTENCY.key(), consistency.toString());
+ options.put(BOOTSTRAP_SERVERS.key(), servers);
+ return createContext(
+ name,
+ RowType.of(new IntType(), new IntType()),
+ keyed ? new int[] {0} : new int[0],
+ options);
+ }
+
+ static SinkRecord testRecord(boolean keyed, int bucket, int key, int value, RowKind rowKind) {
+ return new SinkRecord(
+ EMPTY_ROW,
+ bucket,
+ keyed ? row(key) : EMPTY_ROW,
+ GenericRowData.ofKind(rowKind, key, value));
+ }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
new file mode 100644
index 0000000..dbe7897
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaTableTestBase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+
+/** Base class for Kafka Table IT Cases. */
+public abstract class KafkaTableTestBase extends AbstractTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTableTestBase.class);
+
+ private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+ private static final Network NETWORK = Network.newNetwork();
+ private static final int zkTimeoutMills = 30000;
+
+ @ClassRule
+ public static final KafkaContainer KAFKA_CONTAINER =
+ new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA)) {
+ @Override
+ protected void doStart() {
+ super.doStart();
+ if (LOG.isInfoEnabled()) {
+ this.followOutput(new Slf4jLogConsumer(LOG));
+ }
+ }
+ }.withEmbeddedZookeeper()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
+ .withEnv(
+ "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+ String.valueOf(Duration.ofHours(2).toMillis()))
+ // Disable log deletion to prevent records from being deleted during test run
+ .withEnv("KAFKA_LOG_RETENTION_MS", "-1");
+
+ protected StreamExecutionEnvironment env;
+ protected StreamTableEnvironment tEnv;
+
+ // Timer for scheduling logging task if the test hangs
+ private final Timer loggingTimer = new Timer("Debug Logging Timer");
+
+ @Before
+ public void setup() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env);
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+
+ // Probe Kafka broker status per 30 seconds
+ scheduleTimeoutLogger(
+ Duration.ofSeconds(30),
+ () -> {
+ // List all non-internal topics
+ final Map<String, TopicDescription> topicDescriptions =
+ describeExternalTopics();
+ LOG.info("Current existing topics: {}", topicDescriptions.keySet());
+
+ // Log status of topics
+ logTopicPartitionStatus(topicDescriptions);
+ });
+ }
+
+ @After
+ public void after() {
+ // Cancel timer for debug logging
+ cancelTimeoutLogger();
+ }
+
+ public Properties getStandardProps() {
+ Properties standardProps = new Properties();
+ standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
+ standardProps.put("group.id", "flink-tests");
+ standardProps.put("enable.auto.commit", false);
+ standardProps.put("auto.offset.reset", "earliest");
+ standardProps.put("max.partition.fetch.bytes", 256);
+ standardProps.put("zookeeper.session.timeout.ms", zkTimeoutMills);
+ standardProps.put("zookeeper.connection.timeout.ms", zkTimeoutMills);
+ return standardProps;
+ }
+
+ public String getBootstrapServers() {
+ return KAFKA_CONTAINER.getBootstrapServers();
+ }
+
+ // ------------------------ For Debug Logging Purpose ----------------------------------
+
+ private void scheduleTimeoutLogger(Duration period, Runnable loggingAction) {
+ TimerTask timeoutLoggerTask =
+ new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ loggingAction.run();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to execute logging action", e);
+ }
+ }
+ };
+ loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis());
+ }
+
+ private void cancelTimeoutLogger() {
+ loggingTimer.cancel();
+ }
+
+ private Map<String, TopicDescription> describeExternalTopics() {
+ try (final AdminClient adminClient = AdminClient.create(getStandardProps())) {
+ final List<String> topics =
+ adminClient.listTopics().listings().get().stream()
+ .filter(listing -> !listing.isInternal())
+ .map(TopicListing::name)
+ .collect(Collectors.toList());
+
+ return adminClient.describeTopics(topics).all().get();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to list Kafka topics", e);
+ }
+ }
+
+ private void logTopicPartitionStatus(Map<String, TopicDescription> topicDescriptions) {
+ final Properties properties = getStandardProps();
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-tests-debugging");
+ properties.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getCanonicalName());
+ properties.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ StringDeserializer.class.getCanonicalName());
+ final KafkaConsumer<?, ?> consumer = new KafkaConsumer<String, String>(properties);
+ List<TopicPartition> partitions = new ArrayList<>();
+ topicDescriptions.forEach(
+ (topic, description) ->
+ description
+ .partitions()
+ .forEach(
+ tpInfo ->
+ partitions.add(
+ new TopicPartition(
+ topic, tpInfo.partition()))));
+ final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
+ final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
+ partitions.forEach(
+ partition ->
+ LOG.info(
+ "TopicPartition \"{}\": starting offset: {}, stopping offset: {}",
+ partition,
+ beginningOffsets.get(partition),
+ endOffsets.get(partition)));
+ }
+}
diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
new file mode 100644
index 0000000..e06d527
--- /dev/null
+++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/TestOffsetsLogSink.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.kafka;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.store.log.LogInitContext;
+import org.apache.flink.table.store.log.LogSinkProvider;
+import org.apache.flink.table.store.sink.SinkRecord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+/** Test kafka {@link Sink}. */
+public class TestOffsetsLogSink<
+ WriterT extends
+ StatefulSinkWriter<SinkRecord, WriterStateT>
+ & PrecommittingSinkWriter<SinkRecord, CommT>,
+ CommT,
+ WriterStateT>
+ implements StatefulSink<SinkRecord, WriterStateT>,
+ TwoPhaseCommittingSink<SinkRecord, CommT> {
+
+ private static final Map<String, Map<Integer, Long>> GLOBAL_OFFSETS = new ConcurrentHashMap<>();
+
+ private final LogSinkProvider sinkProvider;
+ private final String uuid;
+ private final Sink<SinkRecord> sink;
+
+ public TestOffsetsLogSink(LogSinkProvider sinkProvider, String uuid) {
+ this.sinkProvider = sinkProvider;
+ this.uuid = uuid;
+ this.sink = sinkProvider.createSink();
+ }
+
+ public static Map<Integer, Long> drainOffsets(String uuid) {
+ return GLOBAL_OFFSETS.remove(uuid);
+ }
+
+ @Override
+ public WriterT createWriter(InitContext initContext) throws IOException {
+ return (WriterT) sink.createWriter(wrapContext(initContext));
+ }
+
+ private InitContext wrapContext(InitContext initContext) {
+ Consumer<?> consumer =
+ sinkProvider.createMetadataConsumer(
+ (bucket, offset) -> {
+ Map<Integer, Long> offsets =
+ GLOBAL_OFFSETS.computeIfAbsent(
+ uuid, k -> new ConcurrentHashMap<>());
+ long nextOffset = offset + 1;
+ offsets.compute(
+ bucket,
+ (k, v) -> v == null ? nextOffset : Math.max(v, nextOffset));
+ });
+ return new LogInitContext(initContext, consumer);
+ }
+
+ @Override
+ public StatefulSinkWriter<SinkRecord, WriterStateT> restoreWriter(
+ InitContext initContext, Collection<WriterStateT> collection) throws IOException {
+ return ((StatefulSink<SinkRecord, WriterStateT>) sink)
+ .restoreWriter(wrapContext(initContext), collection);
+ }
+
+ @Override
+ public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
+ return ((StatefulSink<SinkRecord, WriterStateT>) sink).getWriterStateSerializer();
+ }
+
+ @Override
+ public Committer<CommT> createCommitter() throws IOException {
+ return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).createCommitter();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<CommT> getCommittableSerializer() {
+ return ((TwoPhaseCommittingSink<SinkRecord, CommT>) sink).getCommittableSerializer();
+ }
+}
diff --git a/flink-table-store-kafka/src/test/resources/log4j2-test.properties b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..863665c
--- /dev/null
+++ b/flink-table-store-kafka/src/test/resources/log4j2-test.properties
@@ -0,0 +1,38 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+
+logger.kafka.name = kafka
+logger.kafka.level = OFF
+logger.kafka2.name = state.change
+logger.kafka2.level = OFF
+
+logger.zookeeper.name = org.apache.zookeeper
+logger.zookeeper.level = OFF
+logger.I0Itec.name = org.I0Itec
+logger.I0Itec.level = OFF
diff --git a/pom.xml b/pom.xml
index 5046318..7d048b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@ under the License.
<modules>
<module>flink-table-store-core</module>
<module>flink-table-store-connector</module>
+ <module>flink-table-store-kafka</module>
</modules>
<properties>
@@ -61,6 +62,7 @@ under the License.
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.15</slf4j.version>
<log4j.version>2.17.1</log4j.version>
+ <junit4.version>4.13.2</junit4.version>
<junit5.version>5.8.1</junit5.version>
<spotless.version>2.4.2</spotless.version>
<target.java.version>1.8</target.java.version>
@@ -68,6 +70,7 @@ under the License.
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
<flink.forkCount>1C</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>
+ <testcontainers.version>1.16.2</testcontainers.version>
<!-- Can be set to any value to reproduce a specific build. -->
<test.randomization.seed/>