You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2023/01/31 22:53:20 UTC
[flink] branch master updated: [FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9e8a99c12ce [FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector
9e8a99c12ce is described below
commit 9e8a99c12ce939418086fa9555c1c4f74bcf6b59
Author: shihong90 <25...@qq.com>
AuthorDate: Thu Dec 2 20:42:17 2021 +0800
[FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector
Co-authored-by: shihong90 <25...@qq.com>
This closes #21808
---
docs/content.zh/docs/connectors/table/kafka.md | 44 +++-
docs/content/docs/connectors/table/kafka.md | 45 +++-
.../flink/connector/kafka/source/KafkaSource.java | 5 +
.../connectors/kafka/config/BoundedMode.java | 49 +++++
.../kafka/table/KafkaConnectorOptions.java | 60 ++++++
.../kafka/table/KafkaConnectorOptionsUtil.java | 118 ++++++++++-
.../connectors/kafka/table/KafkaDynamicSource.java | 63 +++++-
.../kafka/table/KafkaDynamicTableFactory.java | 20 ++
.../table/UpsertKafkaDynamicTableFactory.java | 4 +
.../kafka/source/KafkaSourceTestUtils.java | 6 +
.../kafka/table/KafkaDynamicTableFactoryTest.java | 228 ++++++++++++++++++++-
.../connectors/kafka/table/KafkaTableITCase.java | 125 +++++++++++
.../table/UpsertKafkaDynamicTableFactoryTest.java | 4 +
13 files changed, 762 insertions(+), 9 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md
index b57da402ebd..408cb1a2f05 100644
--- a/docs/content.zh/docs/connectors/table/kafka.md
+++ b/docs/content.zh/docs/connectors/table/kafka.md
@@ -290,7 +290,7 @@ CREATE TABLE KafkaTable (
<td><h5>scan.startup.mode</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">group-offsets</td>
- <td>String</td>
+ <td>Enum</td>
<td>Kafka consumer 的启动模式。有效值为:<code>'earliest-offset'</code>,<code>'latest-offset'</code>,<code>'group-offsets'</code>,<code>'timestamp'</code> 和 <code>'specific-offsets'</code>。
请参阅下方 <a href="#起始消费位点">起始消费位点</a> 以获取更多细节。</td>
</tr>
@@ -309,6 +309,32 @@ CREATE TABLE KafkaTable (
<td>Long</td>
<td>在使用 <code>'timestamp'</code> 启动模式时指定启动的时间戳(单位毫秒)。</td>
</tr>
+ <tr>
+ <td><h5>scan.bounded.mode</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">unbounded</td>
+ <td>Enum</td>
+ <td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
+ See the following <a href="#bounded-ending-position">Bounded Ending Position</a> for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.bounded.specific-offsets</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> bounded mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'. If an offset
+ for a partition is not provided it will not consume from that partition.</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>scan.bounded.timestamp-millis</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>End at the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> bounded mode.</td>
+ </tr>
<tr>
<td><h5>scan.topic-partition-discovery.interval</h5></td>
<td>可选</td>
@@ -485,6 +511,22 @@ ROW<`version` INT, `behavior` STRING>
如果使用了 `specific-offsets`,必须使用另外一个配置项 `scan.startup.specific-offsets` 来为每个 partition 指定起始偏移量,
例如,选项值 `partition:0,offset:42;partition:1,offset:300` 表示 partition `0` 从偏移量 `42` 开始,partition `1` 从偏移量 `300` 开始。
+### Bounded Ending Position
+
+The config option `scan.bounded.mode` specifies the bounded mode for Kafka consumer. The valid enumerations are:
+<ul>
+<li><span markdown="span">`group-offsets`</span>: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`latest-offset`</span>: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`timestamp`</span>: bounded by a user-supplied timestamp.</li>
+<li><span markdown="span">`specific-offsets`</span>: bounded by user-supplied specific offsets for each partition.</li>
+</ul>
+
+If config option value `scan.bounded.mode` is not set the default is an unbounded table.
+
+If `timestamp` is specified, another config option `scan.bounded.timestamp-millis` is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
+
+If `specific-offsets` is specified, another config option `scan.bounded.specific-offsets` is required to specify specific bounded offsets for each partition,
+e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. If an offset for a partition is not provided it will not consume from that partition.
### CDC 变更日志(Changelog) Source
diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md
index a5d85989de1..3c9e739c28c 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -313,7 +313,7 @@ Connector Options
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">group-offsets</td>
- <td>String</td>
+ <td>Enum</td>
<td>Startup mode for Kafka consumer, valid values are <code>'earliest-offset'</code>, <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
See the following <a href="#start-reading-position">Start Reading Position</a> for more details.</td>
</tr>
@@ -334,6 +334,32 @@ Connector Options
<td>Long</td>
<td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
</tr>
+ <tr>
+ <td><h5>scan.bounded.mode</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">unbounded</td>
+ <td>Enum</td>
+ <td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
+ See the following <a href="#bounded-ending-position">Bounded Ending Position</a> for more details.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.bounded.specific-offsets</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> bounded mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'. If an offset
+ for a partition is not provided it will not consume from that partition.</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>scan.bounded.timestamp-millis</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>End at the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> bounded mode.</td>
+ </tr>
<tr>
<td><h5>scan.topic-partition-discovery.interval</h5></td>
<td>optional</td>
@@ -535,6 +561,23 @@ If `timestamp` is specified, another config option `scan.startup.timestamp-milli
If `specific-offsets` is specified, another config option `scan.startup.specific-offsets` is required to specify specific startup offsets for each partition,
e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`.
+### Bounded Ending Position
+
+The config option `scan.bounded.mode` specifies the bounded mode for Kafka consumer. The valid enumerations are:
+<ul>
+<li><span markdown="span">`group-offsets`</span>: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`latest-offset`</span>: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`timestamp`</span>: bounded by a user-supplied timestamp.</li>
+<li><span markdown="span">`specific-offsets`</span>: bounded by user-supplied specific offsets for each partition.</li>
+</ul>
+
+If config option value `scan.bounded.mode` is not set the default is an unbounded table.
+
+If `timestamp` is specified, another config option `scan.bounded.timestamp-millis` is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
+
+If `specific-offsets` is specified, another config option `scan.bounded.specific-offsets` is required to specify specific bounded offsets for each partition,
+e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. If an offset for a partition is not provided it will not consume from that partition.
+
### CDC Changelog Source
Flink natively supports Kafka as a CDC changelog source. If messages in a Kafka topic are change event captured from other databases using a CDC tool, you can use the corresponding Flink CDC format to interpret the messages as INSERT/UPDATE/DELETE statements into a Flink SQL table.
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 1b327504587..7a17b1ff6f9 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -233,4 +233,9 @@ public class KafkaSource<OUT>
KafkaSubscriber getKafkaSubscriber() {
return subscriber;
}
+
+ @VisibleForTesting
+ OffsetsInitializer getStoppingOffsetsInitializer() {
+ return stoppingOffsetsInitializer;
+ }
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java
new file mode 100644
index 00000000000..beb2306b581
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+
+/** End modes for the Kafka Consumer. */
+@Internal
+public enum BoundedMode {
+
+ /** Do not end consuming. */
+ UNBOUNDED,
+
+ /**
+ * End from committed offsets in ZK / Kafka brokers of a specific consumer group. This is
+ * evaluated at the start of consumption from a given partition.
+ */
+ GROUP_OFFSETS,
+
+ /**
+ * End from the latest offset. This is evaluated at the start of consumption from a given
+ * partition.
+ */
+ LATEST,
+
+ /** End from user-supplied timestamp for each partition. */
+ TIMESTAMP,
+
+ /**
+ * End from user-supplied specific offsets for each partition. If an offset for a partition is
+ * not provided it will not consume from that partition.
+ */
+ SPECIFIC_OFFSETS;
+}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
index 1150ab4b1c3..a6cdbcedc96 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
@@ -151,6 +151,12 @@ public class KafkaConnectorOptions {
.defaultValue(ScanStartupMode.GROUP_OFFSETS)
.withDescription("Startup mode for Kafka consumer.");
+ public static final ConfigOption<ScanBoundedMode> SCAN_BOUNDED_MODE =
+ ConfigOptions.key("scan.bounded.mode")
+ .enumType(ScanBoundedMode.class)
+ .defaultValue(ScanBoundedMode.UNBOUNDED)
+ .withDescription("Bounded mode for Kafka consumer.");
+
public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS =
ConfigOptions.key("scan.startup.specific-offsets")
.stringType()
@@ -158,6 +164,13 @@ public class KafkaConnectorOptions {
.withDescription(
"Optional offsets used in case of \"specific-offsets\" startup mode");
+ public static final ConfigOption<String> SCAN_BOUNDED_SPECIFIC_OFFSETS =
+ ConfigOptions.key("scan.bounded.specific-offsets")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional offsets used in case of \"specific-offsets\" bounded mode");
+
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
ConfigOptions.key("scan.startup.timestamp-millis")
.longType()
@@ -165,6 +178,13 @@ public class KafkaConnectorOptions {
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");
+ public static final ConfigOption<Long> SCAN_BOUNDED_TIMESTAMP_MILLIS =
+ ConfigOptions.key("scan.bounded.timestamp-millis")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional timestamp used in case of \"timestamp\" bounded mode");
+
public static final ConfigOption<Duration> SCAN_TOPIC_PARTITION_DISCOVERY =
ConfigOptions.key("scan.topic-partition-discovery.interval")
.durationType()
@@ -291,5 +311,45 @@ public class KafkaConnectorOptions {
}
}
+ /** Bounded mode for the Kafka consumer, see {@link #SCAN_BOUNDED_MODE}. */
+ public enum ScanBoundedMode implements DescribedEnum {
+ UNBOUNDED("unbounded", text("Do not stop consuming")),
+ LATEST_OFFSET(
+ "latest-offset",
+ text(
+ "Bounded by latest offsets. This is evaluated at the start of consumption"
+ + " from a given partition.")),
+ GROUP_OFFSETS(
+ "group-offsets",
+ text(
+ "Bounded by committed offsets in ZooKeeper / Kafka brokers of a specific"
+ + " consumer group. This is evaluated at the start of consumption"
+ + " from a given partition.")),
+ TIMESTAMP("timestamp", text("Bounded by a user-supplied timestamp.")),
+ SPECIFIC_OFFSETS(
+ "specific-offsets",
+ text(
+ "Bounded by user-supplied specific offsets for each partition. If an offset"
+ + " for a partition is not provided it will not consume from that"
+ + " partition."));
+ private final String value;
+ private final InlineElement description;
+
+ ScanBoundedMode(String value, InlineElement description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return description;
+ }
+ }
+
private KafkaConnectorOptions() {}
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index fb848036e88..ef70644e5d9 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -24,10 +24,12 @@ import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy;
import org.apache.flink.table.api.TableException;
@@ -56,6 +58,9 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -101,6 +106,7 @@ class KafkaConnectorOptionsUtil {
public static void validateTableSourceOptions(ReadableConfig tableOptions) {
validateSourceTopic(tableOptions);
validateScanStartupMode(tableOptions);
+ validateScanBoundedMode(tableOptions);
}
public static void validateTableSinkOptions(ReadableConfig tableOptions) {
@@ -183,6 +189,49 @@ class KafkaConnectorOptionsUtil {
});
}
+ private static void validateScanBoundedMode(ReadableConfig tableOptions) {
+ tableOptions
+ .getOptional(SCAN_BOUNDED_MODE)
+ .ifPresent(
+ mode -> {
+ switch (mode) {
+ case TIMESTAMP:
+ if (!tableOptions
+ .getOptional(SCAN_BOUNDED_TIMESTAMP_MILLIS)
+ .isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "'%s' is required in '%s' bounded mode"
+ + " but missing.",
+ SCAN_BOUNDED_TIMESTAMP_MILLIS.key(),
+ ScanBoundedMode.TIMESTAMP));
+ }
+
+ break;
+ case SPECIFIC_OFFSETS:
+ if (!tableOptions
+ .getOptional(SCAN_BOUNDED_SPECIFIC_OFFSETS)
+ .isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "'%s' is required in '%s' bounded mode"
+ + " but missing.",
+ SCAN_BOUNDED_SPECIFIC_OFFSETS.key(),
+ ScanBoundedMode.SPECIFIC_OFFSETS));
+ }
+ if (!isSingleTopic(tableOptions)) {
+ throw new ValidationException(
+ "Currently Kafka source only supports specific offset for single topic.");
+ }
+ String specificOffsets =
+ tableOptions.get(SCAN_BOUNDED_SPECIFIC_OFFSETS);
+ parseSpecificOffsets(
+ specificOffsets, SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
+ break;
+ }
+ });
+ }
+
private static void validateSinkPartitioner(ReadableConfig tableOptions) {
tableOptions
.getOptional(SINK_PARTITIONER)
@@ -241,6 +290,23 @@ class KafkaConnectorOptionsUtil {
return options;
}
+ public static BoundedOptions getBoundedOptions(ReadableConfig tableOptions) {
+ final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+ final BoundedMode boundedMode =
+ KafkaConnectorOptionsUtil.fromOption(tableOptions.get(SCAN_BOUNDED_MODE));
+ if (boundedMode == BoundedMode.SPECIFIC_OFFSETS) {
+ buildBoundedOffsets(tableOptions, tableOptions.get(TOPIC).get(0), specificOffsets);
+ }
+
+ final BoundedOptions options = new BoundedOptions();
+ options.boundedMode = boundedMode;
+ options.specificOffsets = specificOffsets;
+ if (boundedMode == BoundedMode.TIMESTAMP) {
+ options.boundedTimestampMillis = tableOptions.get(SCAN_BOUNDED_TIMESTAMP_MILLIS);
+ }
+ return options;
+ }
+
private static void buildSpecificOffsets(
ReadableConfig tableOptions,
String topic,
@@ -256,6 +322,22 @@ class KafkaConnectorOptionsUtil {
});
}
+ public static void buildBoundedOffsets(
+ ReadableConfig tableOptions,
+ String topic,
+ Map<KafkaTopicPartition, Long> specificOffsets) {
+ String specificOffsetsEndOpt = tableOptions.get(SCAN_BOUNDED_SPECIFIC_OFFSETS);
+ final Map<Integer, Long> offsetMap =
+ parseSpecificOffsets(specificOffsetsEndOpt, SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
+
+ offsetMap.forEach(
+ (partition, offset) -> {
+ final KafkaTopicPartition topicPartition =
+ new KafkaTopicPartition(topic, partition);
+ specificOffsets.put(topicPartition, offset);
+ });
+ }
+
/**
* Returns the {@link StartupMode} of Kafka Consumer by passed-in table-specific {@link
* ScanStartupMode}.
@@ -279,6 +361,29 @@ class KafkaConnectorOptionsUtil {
}
}
+ /**
+ * Returns the {@link BoundedMode} of Kafka Consumer by passed-in table-specific {@link
+ * ScanBoundedMode}.
+ */
+ private static BoundedMode fromOption(ScanBoundedMode scanBoundedMode) {
+ switch (scanBoundedMode) {
+ case UNBOUNDED:
+ return BoundedMode.UNBOUNDED;
+ case LATEST_OFFSET:
+ return BoundedMode.LATEST;
+ case GROUP_OFFSETS:
+ return BoundedMode.GROUP_OFFSETS;
+ case TIMESTAMP:
+ return BoundedMode.TIMESTAMP;
+ case SPECIFIC_OFFSETS:
+ return BoundedMode.SPECIFIC_OFFSETS;
+
+ default:
+ throw new TableException(
+ "Unsupported bounded mode. Validator should have checked that.");
+ }
+ }
+
public static Properties getKafkaProperties(Map<String, String> tableOptions) {
final Properties kafkaProperties = new Properties();
@@ -320,15 +425,15 @@ class KafkaConnectorOptionsUtil {
}
/**
- * Parses SpecificOffsets String to Map.
+ * Parses specificOffsets String to Map.
*
- * <p>SpecificOffsets String format was given as following:
+ * <p>specificOffsets String format was given as following:
*
* <pre>
* scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300
* </pre>
*
- * @return SpecificOffsets with Map format, key is partition, and value is offset
+ * @return specificOffsets with Map format, key is partition, and value is offset
*/
public static Map<Integer, Long> parseSpecificOffsets(
String specificOffsetsStr, String optionKey) {
@@ -581,5 +686,12 @@ class KafkaConnectorOptionsUtil {
public long startupTimestampMillis;
}
+ /** Kafka bounded options. * */
+ public static class BoundedOptions {
+ public BoundedMode boundedMode;
+ public Map<KafkaTopicPartition, Long> specificOffsets;
+ public long boundedTimestampMillis;
+ }
+
private KafkaConnectorOptionsUtil() {}
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index d6bf27a104b..c963da76206 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -25,12 +25,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
@@ -149,6 +151,21 @@ public class KafkaDynamicSource
*/
protected final long startupTimestampMillis;
+ /** The bounded mode for the contained consumer (default is an unbounded data stream). */
+ protected final BoundedMode boundedMode;
+
+ /**
+ * Specific end offsets; only relevant when bounded mode is {@link
+ * BoundedMode#SPECIFIC_OFFSETS}.
+ */
+ protected final Map<KafkaTopicPartition, Long> specificBoundedOffsets;
+
+ /**
+ * The bounded timestamp to locate partition offsets; only relevant when bounded mode is {@link
+ * BoundedMode#TIMESTAMP}.
+ */
+ protected final long boundedTimestampMillis;
+
/** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
protected final boolean upsertMode;
@@ -167,6 +184,9 @@ public class KafkaDynamicSource
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
+ BoundedMode boundedMode,
+ Map<KafkaTopicPartition, Long> specificBoundedOffsets,
+ long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier) {
// Format attributes
@@ -200,6 +220,12 @@ public class KafkaDynamicSource
Preconditions.checkNotNull(
specificStartupOffsets, "Specific offsets must not be null.");
this.startupTimestampMillis = startupTimestampMillis;
+ this.boundedMode =
+ Preconditions.checkNotNull(boundedMode, "Bounded mode must not be null.");
+ this.specificBoundedOffsets =
+ Preconditions.checkNotNull(
+ specificBoundedOffsets, "Specific bounded offsets must not be null.");
+ this.boundedTimestampMillis = boundedTimestampMillis;
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
}
@@ -314,6 +340,9 @@ public class KafkaDynamicSource
startupMode,
specificStartupOffsets,
startupTimestampMillis,
+ boundedMode,
+ specificBoundedOffsets,
+ boundedTimestampMillis,
upsertMode,
tableIdentifier);
copy.producedDataType = producedDataType;
@@ -350,6 +379,9 @@ public class KafkaDynamicSource
&& startupMode == that.startupMode
&& Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
&& startupTimestampMillis == that.startupTimestampMillis
+ && boundedMode == that.boundedMode
+ && Objects.equals(specificBoundedOffsets, that.specificBoundedOffsets)
+ && boundedTimestampMillis == that.boundedTimestampMillis
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(tableIdentifier, that.tableIdentifier)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
@@ -363,8 +395,8 @@ public class KafkaDynamicSource
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
- keyProjection,
- valueProjection,
+ Arrays.hashCode(keyProjection),
+ Arrays.hashCode(valueProjection),
keyPrefix,
topics,
topicPattern,
@@ -372,6 +404,9 @@ public class KafkaDynamicSource
startupMode,
specificStartupOffsets,
startupTimestampMillis,
+ boundedMode,
+ specificBoundedOffsets,
+ boundedTimestampMillis,
upsertMode,
tableIdentifier,
watermarkStrategy);
@@ -427,6 +462,30 @@ public class KafkaDynamicSource
break;
}
+ switch (boundedMode) {
+ case UNBOUNDED:
+ kafkaSourceBuilder.setUnbounded(new NoStoppingOffsetsInitializer());
+ break;
+ case LATEST:
+ kafkaSourceBuilder.setBounded(OffsetsInitializer.latest());
+ break;
+ case GROUP_OFFSETS:
+ kafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets());
+ break;
+ case SPECIFIC_OFFSETS:
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ specificBoundedOffsets.forEach(
+ (tp, offset) ->
+ offsets.put(
+ new TopicPartition(tp.getTopic(), tp.getPartition()),
+ offset));
+ kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(offsets));
+ break;
+ case TIMESTAMP:
+ kafkaSourceBuilder.setBounded(OffsetsInitializer.timestamp(boundedTimestampMillis));
+ break;
+ }
+
kafkaSourceBuilder
.setProperties(properties)
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 5ac146d7197..48c00918aa1 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -27,9 +27,11 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
@@ -69,6 +71,9 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -85,6 +90,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
@@ -143,6 +149,9 @@ public class KafkaDynamicTableFactory
options.add(DELIVERY_GUARANTEE);
options.add(TRANSACTIONAL_ID_PREFIX);
options.add(SINK_SEMANTIC);
+ options.add(SCAN_BOUNDED_MODE);
+ options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
+ options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
return options;
}
@@ -187,6 +196,8 @@ public class KafkaDynamicTableFactory
final StartupOptions startupOptions = getStartupOptions(tableOptions);
+ final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
+
final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
// add topic-partition discovery
@@ -217,6 +228,9 @@ public class KafkaDynamicTableFactory
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis,
+ boundedOptions.boundedMode,
+ boundedOptions.specificOffsets,
+ boundedOptions.boundedTimestampMillis,
context.getObjectIdentifier().asSummaryString());
}
@@ -378,6 +392,9 @@ public class KafkaDynamicTableFactory
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
+ BoundedMode boundedMode,
+ Map<KafkaTopicPartition, Long> specificEndOffsets,
+ long endTimestampMillis,
String tableIdentifier) {
return new KafkaDynamicSource(
physicalDataType,
@@ -392,6 +409,9 @@ public class KafkaDynamicTableFactory
startupMode,
specificStartupOffsets,
startupTimestampMillis,
+ boundedMode,
+ specificEndOffsets,
+ endTimestampMillis,
false,
tableIdentifier);
}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index 8682e405121..254e1bf9852 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -141,6 +142,9 @@ public class UpsertKafkaDynamicTableFactory
earliest,
Collections.emptyMap(),
0,
+ BoundedMode.UNBOUNDED,
+ Collections.emptyMap(),
+ 0,
true,
context.getObjectIdentifier().asSummaryString());
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java
index 572b77d0787..e95f05babe1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import java.util.Collection;
@@ -50,4 +51,9 @@ public class KafkaSourceTestUtils {
public static Configuration getKafkaSourceConfiguration(KafkaSource<?> kafkaSource) {
return kafkaSource.getConfiguration();
}
+
+ /** Get stopping offsets initializer. */
+ public static OffsetsInitializer getStoppingOffsetsInitializer(KafkaSource<?> kafkaSource) {
+ return kafkaSource.getStoppingOffsetsInitializer();
+ }
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 0b908abfa5f..a0b74a5b763 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
@@ -30,6 +31,7 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
import org.apache.flink.formats.avro.RowDataToAvroConverters;
@@ -38,6 +40,7 @@ import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSer
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -73,7 +76,9 @@ import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.jupiter.api.Test;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
@@ -82,6 +87,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -91,6 +97,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.regex.Pattern;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -103,7 +110,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Abstract test base for {@link KafkaDynamicTableFactory}. */
+/** Tests for {@link KafkaDynamicTableFactory}. */
@ExtendWith(TestLoggerExtension.class)
public class KafkaDynamicTableFactoryTest {
@@ -425,6 +432,220 @@ public class KafkaDynamicTableFactoryTest {
}
}
+ @Test
+ public void testBoundedSpecificOffsetsValidate() {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options -> {
+ options.put(
+ KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(),
+ "specific-offsets");
+ });
+ assertThatThrownBy(() -> createTableSource(SCHEMA, modifiedOptions))
+ .cause()
+ .hasMessageContaining(
+ "'scan.bounded.specific-offsets' is required in 'specific-offsets' bounded mode but missing.");
+ }
+
+ @Test
+ public void testBoundedSpecificOffsets() {
+ testBoundedOffsets(
+ "specific-offsets",
+ options -> {
+ options.put("scan.bounded.specific-offsets", "partition:0,offset:2");
+ },
+ source -> {
+ assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+ OffsetsInitializer offsetsInitializer =
+ KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+ TopicPartition partition = new TopicPartition(TOPIC, 0);
+ Map<TopicPartition, Long> partitionOffsets =
+ offsetsInitializer.getPartitionOffsets(
+ Collections.singletonList(partition),
+ MockPartitionOffsetsRetriever.noInteractions());
+ assertThat(partitionOffsets)
+ .containsOnlyKeys(partition)
+ .containsEntry(partition, 2L);
+ });
+ }
+
+ @Test
+ public void testBoundedLatestOffset() {
+ testBoundedOffsets(
+ "latest-offset",
+ options -> {},
+ source -> {
+ assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+ OffsetsInitializer offsetsInitializer =
+ KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+ TopicPartition partition = new TopicPartition(TOPIC, 0);
+ Map<TopicPartition, Long> partitionOffsets =
+ offsetsInitializer.getPartitionOffsets(
+ Collections.singletonList(partition),
+ MockPartitionOffsetsRetriever.noInteractions());
+ assertThat(partitionOffsets)
+ .containsOnlyKeys(partition)
+ .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET);
+ });
+ }
+
+ @Test
+ public void testBoundedGroupOffsets() {
+ testBoundedOffsets(
+ "group-offsets",
+ options -> {},
+ source -> {
+ assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+ OffsetsInitializer offsetsInitializer =
+ KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+ TopicPartition partition = new TopicPartition(TOPIC, 0);
+ Map<TopicPartition, Long> partitionOffsets =
+ offsetsInitializer.getPartitionOffsets(
+ Collections.singletonList(partition),
+ MockPartitionOffsetsRetriever.noInteractions());
+ assertThat(partitionOffsets)
+ .containsOnlyKeys(partition)
+ .containsEntry(partition, KafkaPartitionSplit.COMMITTED_OFFSET);
+ });
+ }
+
+ @Test
+ public void testBoundedTimestamp() {
+ testBoundedOffsets(
+ "timestamp",
+ options -> {
+ options.put("scan.bounded.timestamp-millis", "1");
+ },
+ source -> {
+ assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+ OffsetsInitializer offsetsInitializer =
+ KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+ TopicPartition partition = new TopicPartition(TOPIC, 0);
+ long offsetForTimestamp = 123L;
+ Map<TopicPartition, Long> partitionOffsets =
+ offsetsInitializer.getPartitionOffsets(
+ Collections.singletonList(partition),
+ MockPartitionOffsetsRetriever.timestampAndEnd(
+ partitions -> {
+ assertThat(partitions)
+ .containsOnlyKeys(partition)
+ .containsEntry(partition, 1L);
+ Map<TopicPartition, OffsetAndTimestamp> result =
+ new HashMap<>();
+ result.put(
+ partition,
+ new OffsetAndTimestamp(
+ offsetForTimestamp, 1L));
+ return result;
+ },
+ partitions -> {
+ Map<TopicPartition, Long> result = new HashMap<>();
+ result.put(
+ partition,
+ // the end offset is bigger than given by
+ // timestamp
+ // to make sure the one for timestamp is
+ // used
+ offsetForTimestamp + 1000L);
+ return result;
+ }));
+ assertThat(partitionOffsets)
+ .containsOnlyKeys(partition)
+ .containsEntry(partition, offsetForTimestamp);
+ });
+ }
+
+ private void testBoundedOffsets(
+ String boundedMode,
+ Consumer<Map<String, String>> optionsConfig,
+ Consumer<KafkaSource<?>> validator) {
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options -> {
+ options.put(KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(), boundedMode);
+ optionsConfig.accept(options);
+ });
+ final DynamicTableSource tableSource = createTableSource(SCHEMA, modifiedOptions);
+ assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class);
+ ScanTableSource.ScanRuntimeProvider provider =
+ ((KafkaDynamicSource) tableSource)
+ .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+ assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+ final KafkaSource<?> kafkaSource = assertKafkaSource(provider);
+ validator.accept(kafkaSource);
+ }
+
+ private interface OffsetsRetriever
+ extends Function<Collection<TopicPartition>, Map<TopicPartition, Long>> {}
+
+ private interface TimestampOffsetsRetriever
+ extends Function<Map<TopicPartition, Long>, Map<TopicPartition, OffsetAndTimestamp>> {}
+
+ private static final class MockPartitionOffsetsRetriever
+ implements OffsetsInitializer.PartitionOffsetsRetriever {
+
+ public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL =
+ partitions -> {
+ throw new UnsupportedOperationException(
+ "The method was not supposed to be called");
+ };
+ private final OffsetsRetriever committedOffsets;
+ private final OffsetsRetriever endOffsets;
+ private final OffsetsRetriever beginningOffsets;
+ private final TimestampOffsetsRetriever offsetsForTimes;
+
+ static MockPartitionOffsetsRetriever noInteractions() {
+ return new MockPartitionOffsetsRetriever(
+ UNSUPPORTED_RETRIEVAL,
+ UNSUPPORTED_RETRIEVAL,
+ UNSUPPORTED_RETRIEVAL,
+ partitions -> {
+ throw new UnsupportedOperationException(
+ "The method was not supposed to be called");
+ });
+ }
+
+ static MockPartitionOffsetsRetriever timestampAndEnd(
+ TimestampOffsetsRetriever retriever, OffsetsRetriever endOffsets) {
+ return new MockPartitionOffsetsRetriever(
+ UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever);
+ }
+
+ private MockPartitionOffsetsRetriever(
+ OffsetsRetriever committedOffsets,
+ OffsetsRetriever endOffsets,
+ OffsetsRetriever beginningOffsets,
+ TimestampOffsetsRetriever offsetsForTimes) {
+ this.committedOffsets = committedOffsets;
+ this.endOffsets = endOffsets;
+ this.beginningOffsets = beginningOffsets;
+ this.offsetsForTimes = offsetsForTimes;
+ }
+
+ @Override
+ public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
+ return committedOffsets.apply(partitions);
+ }
+
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+ return endOffsets.apply(partitions);
+ }
+
+ @Override
+ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+ return beginningOffsets.apply(partitions);
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
+ Map<TopicPartition, Long> timestampsToSearch) {
+ return offsetsForTimes.apply(timestampsToSearch);
+ }
+ }
+
@Test
public void testTableSink() {
final Map<String, String> modifiedOptions =
@@ -974,6 +1195,9 @@ public class KafkaDynamicTableFactoryTest {
startupMode,
specificStartupOffsets,
startupTimestampMillis,
+ BoundedMode.UNBOUNDED,
+ Collections.emptyMap(),
+ 0,
false,
FactoryMocks.IDENTIFIER.asSummaryString());
}
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 975e5cd4375..515526f935e 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.utils.EncodingUtils;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -40,6 +41,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.time.Duration;
+import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
@@ -179,6 +181,129 @@ public class KafkaTableITCase extends KafkaTableTestBase {
deleteTestTopic(topic);
}
+ @Test
+ public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+ createTestTopic(topic, 1, 1);
+
+ // ---------- Produce an event time stream into Kafka -------------------
+ String groupId = getStandardProps().getProperty("group.id");
+ String bootstraps = getBootstrapServers();
+
+ final String createTable =
+ String.format(
+ "CREATE TABLE kafka (\n"
+ + " `user_id` INT,\n"
+ + " `item_id` INT,\n"
+ + " `behavior` STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = '%s',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' = 'earliest-offset',\n"
+ + " 'scan.bounded.mode' = 'specific-offsets',\n"
+ + " 'scan.bounded.specific-offsets' = 'partition:0,offset:2',\n"
+ + " %s\n"
+ + ")\n",
+ KafkaDynamicTableFactory.IDENTIFIER,
+ topic,
+ bootstraps,
+ groupId,
+ formatOptions());
+
+ tEnv.executeSql(createTable);
+
+ List<Row> values =
+ Arrays.asList(
+ Row.of(1, 1102, "behavior 1"),
+ Row.of(2, 1103, "behavior 2"),
+ Row.of(3, 1104, "behavior 3"));
+ tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+ // ---------- Consume stream from Kafka -------------------
+
+ List<Row> results = new ArrayList<>();
+ try (CloseableIterator<Row> resultsItr =
+ tEnv.sqlQuery("SELECT * from kafka").execute().collect()) {
+ while (resultsItr.hasNext()) {
+ results.add(resultsItr.next());
+ }
+ }
+
+ assertThat(results)
+ .containsExactly(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, "behavior 2"));
+
+ // ------------- cleanup -------------------
+
+ deleteTestTopic(topic);
+ }
+
+ @Test
+ public void testKafkaSourceSinkWithBoundedTimestamp() throws Exception {
+ // we always use a different topic name for each parameterized topic,
+ // in order to make sure the topic can be created.
+ final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+ createTestTopic(topic, 1, 1);
+
+ // ---------- Produce an event time stream into Kafka -------------------
+ String groupId = getStandardProps().getProperty("group.id");
+ String bootstraps = getBootstrapServers();
+
+ final String createTable =
+ String.format(
+ "CREATE TABLE kafka (\n"
+ + " `user_id` INT,\n"
+ + " `item_id` INT,\n"
+ + " `behavior` STRING,\n"
+ + " `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'"
+ + ") WITH (\n"
+ + " 'connector' = '%s',\n"
+ + " 'topic' = '%s',\n"
+ + " 'properties.bootstrap.servers' = '%s',\n"
+ + " 'properties.group.id' = '%s',\n"
+ + " 'scan.startup.mode' = 'earliest-offset',\n"
+ + " 'scan.bounded.mode' = 'timestamp',\n"
+ + " 'scan.bounded.timestamp-millis' = '5',\n"
+ + " %s\n"
+ + ")\n",
+ KafkaDynamicTableFactory.IDENTIFIER,
+ topic,
+ bootstraps,
+ groupId,
+ formatOptions());
+
+ tEnv.executeSql(createTable);
+
+ List<Row> values =
+ Arrays.asList(
+ Row.of(1, 1102, "behavior 1", Instant.ofEpochMilli(0L)),
+ Row.of(2, 1103, "behavior 2", Instant.ofEpochMilli(3L)),
+ Row.of(3, 1104, "behavior 3", Instant.ofEpochMilli(7L)));
+ tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+ // ---------- Consume stream from Kafka -------------------
+
+ List<Row> results = new ArrayList<>();
+ try (CloseableIterator<Row> resultsItr =
+ tEnv.sqlQuery("SELECT * from kafka").execute().collect()) {
+ while (resultsItr.hasNext()) {
+ results.add(resultsItr.next());
+ }
+ }
+
+ assertThat(results)
+ .containsExactly(
+ Row.of(1, 1102, "behavior 1", Instant.ofEpochMilli(0L)),
+ Row.of(2, 1103, "behavior 2", Instant.ofEpochMilli(3L)));
+
+ // ------------- cleanup -------------------
+
+ deleteTestTopic(topic);
+ }
+
@Test
public void testKafkaTableWithMultipleTopics() throws Exception {
// ---------- create source and sink tables -------------------
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 79884ad0929..5caaaa0acbc 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.table.api.DataTypes;
@@ -609,6 +610,9 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger {
StartupMode.EARLIEST,
Collections.emptyMap(),
0,
+ BoundedMode.UNBOUNDED,
+ Collections.emptyMap(),
+ 0,
true,
FactoryMocks.IDENTIFIER.asSummaryString());
}