You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2023/01/31 22:53:20 UTC

[flink] branch master updated: [FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e8a99c12ce [FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector
9e8a99c12ce is described below

commit 9e8a99c12ce939418086fa9555c1c4f74bcf6b59
Author: shihong90 <25...@qq.com>
AuthorDate: Thu Dec 2 20:42:17 2021 +0800

    [FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector
    
    Co-authored-by: shihong90 <25...@qq.com>
    
    This closes #21808
---
 docs/content.zh/docs/connectors/table/kafka.md     |  44 +++-
 docs/content/docs/connectors/table/kafka.md        |  45 +++-
 .../flink/connector/kafka/source/KafkaSource.java  |   5 +
 .../connectors/kafka/config/BoundedMode.java       |  49 +++++
 .../kafka/table/KafkaConnectorOptions.java         |  60 ++++++
 .../kafka/table/KafkaConnectorOptionsUtil.java     | 118 ++++++++++-
 .../connectors/kafka/table/KafkaDynamicSource.java |  63 +++++-
 .../kafka/table/KafkaDynamicTableFactory.java      |  20 ++
 .../table/UpsertKafkaDynamicTableFactory.java      |   4 +
 .../kafka/source/KafkaSourceTestUtils.java         |   6 +
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 228 ++++++++++++++++++++-
 .../connectors/kafka/table/KafkaTableITCase.java   | 125 +++++++++++
 .../table/UpsertKafkaDynamicTableFactoryTest.java  |   4 +
 13 files changed, 762 insertions(+), 9 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md
index b57da402ebd..408cb1a2f05 100644
--- a/docs/content.zh/docs/connectors/table/kafka.md
+++ b/docs/content.zh/docs/connectors/table/kafka.md
@@ -290,7 +290,7 @@ CREATE TABLE KafkaTable (
       <td><h5>scan.startup.mode</h5></td>
       <td>可选</td>
       <td style="word-wrap: break-word;">group-offsets</td>
-      <td>String</td>
+      <td>Enum</td>
       <td>Kafka consumer 的启动模式。有效值为:<code>'earliest-offset'</code>,<code>'latest-offset'</code>,<code>'group-offsets'</code>,<code>'timestamp'</code> 和 <code>'specific-offsets'</code>。
        请参阅下方 <a href="#起始消费位点">起始消费位点</a> 以获取更多细节。</td>
     </tr>
@@ -309,6 +309,32 @@ CREATE TABLE KafkaTable (
       <td>Long</td>
       <td>在使用 <code>'timestamp'</code> 启动模式时指定启动的时间戳(单位毫秒)。</td>
     </tr>
+    <tr>
+      <td><h5>scan.bounded.mode</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">unbounded</td>
+      <td>Enum</td>
+      <td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
+       See the following <a href="#bounded-ending-position">Bounded Ending Position</a> for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.bounded.specific-offsets</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> bounded mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'. If an offset
+       for a partition is not provided it will not consume from that partition.</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>scan.bounded.timestamp-millis</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Long</td>
+      <td>End at the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> bounded mode.</td>
+    </tr>
     <tr>
       <td><h5>scan.topic-partition-discovery.interval</h5></td>
       <td>可选</td>
@@ -485,6 +511,22 @@ ROW<`version` INT, `behavior` STRING>
 如果使用了 `specific-offsets`,必须使用另外一个配置项 `scan.startup.specific-offsets` 来为每个 partition 指定起始偏移量,
 例如,选项值 `partition:0,offset:42;partition:1,offset:300` 表示 partition `0` 从偏移量 `42` 开始,partition `1` 从偏移量 `300` 开始。
 
+### Bounded Ending Position
+
+The config option `scan.bounded.mode` specifies the bounded mode for Kafka consumer. The valid enumerations are:
+<ul>
+<li><span markdown="span">`group-offsets`</span>: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`latest-offset`</span>: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`timestamp`</span>: bounded by a user-supplied timestamp.</li>
+<li><span markdown="span">`specific-offsets`</span>: bounded by user-supplied specific offsets for each partition.</li>
+</ul>
+
+If config option value `scan.bounded.mode` is not set the default is an unbounded table.
+
+If `timestamp` is specified, another config option `scan.bounded.timestamp-millis` is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
+
+If `specific-offsets` is specified, another config option `scan.bounded.specific-offsets` is required to specify specific bounded offsets for each partition,
+e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. If an offset for a partition is not provided it will not consume from that partition.
 
 ### CDC 变更日志(Changelog) Source
 
diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md
index a5d85989de1..3c9e739c28c 100644
--- a/docs/content/docs/connectors/table/kafka.md
+++ b/docs/content/docs/connectors/table/kafka.md
@@ -313,7 +313,7 @@ Connector Options
       <td>optional</td>
       <td>yes</td>
       <td style="word-wrap: break-word;">group-offsets</td>
-      <td>String</td>
+      <td>Enum</td>
       <td>Startup mode for Kafka consumer, valid values are <code>'earliest-offset'</code>, <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
        See the following <a href="#start-reading-position">Start Reading Position</a> for more details.</td>
     </tr>
@@ -334,6 +334,32 @@ Connector Options
       <td>Long</td>
       <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
     </tr>
+    <tr>
+      <td><h5>scan.bounded.mode</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">unbounded</td>
+      <td>Enum</td>
+      <td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
+       See the following <a href="#bounded-ending-position">Bounded Ending Position</a> for more details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.bounded.specific-offsets</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> bounded mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'. If an offset
+       for a partition is not provided it will not consume from that partition.</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>scan.bounded.timestamp-millis</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Long</td>
+      <td>End at the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> bounded mode.</td>
+    </tr>
     <tr>
       <td><h5>scan.topic-partition-discovery.interval</h5></td>
       <td>optional</td>
@@ -535,6 +561,23 @@ If `timestamp` is specified, another config option `scan.startup.timestamp-milli
 If `specific-offsets` is specified, another config option `scan.startup.specific-offsets` is required to specify specific startup offsets for each partition,
 e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`.
 
+### Bounded Ending Position
+
+The config option `scan.bounded.mode` specifies the bounded mode for Kafka consumer. The valid enumerations are:
+<ul>
+<li><span markdown="span">`group-offsets`</span>: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`latest-offset`</span>: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.</li>
+<li><span markdown="span">`timestamp`</span>: bounded by a user-supplied timestamp.</li>
+<li><span markdown="span">`specific-offsets`</span>: bounded by user-supplied specific offsets for each partition.</li>
+</ul>
+
+If config option value `scan.bounded.mode` is not set the default is an unbounded table.
+
+If `timestamp` is specified, another config option `scan.bounded.timestamp-millis` is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT.
+
+If `specific-offsets` is specified, another config option `scan.bounded.specific-offsets` is required to specify specific bounded offsets for each partition,
+e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. If an offset for a partition is not provided it will not consume from that partition.
+
 ### CDC Changelog Source
 
 Flink natively supports Kafka as a CDC changelog source. If messages in a Kafka topic are change event captured from other databases using a CDC tool, you can use the corresponding Flink CDC format to interpret the messages as INSERT/UPDATE/DELETE statements into a Flink SQL table.
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
index 1b327504587..7a17b1ff6f9 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java
@@ -233,4 +233,9 @@ public class KafkaSource<OUT>
     KafkaSubscriber getKafkaSubscriber() {
         return subscriber;
     }
+
+    @VisibleForTesting
+    OffsetsInitializer getStoppingOffsetsInitializer() {
+        return stoppingOffsetsInitializer;
+    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java
new file mode 100644
index 00000000000..beb2306b581
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.annotation.Internal;
+
+/** End modes for the Kafka Consumer. */
+@Internal
+public enum BoundedMode {
+
+    /** Do not end consuming. */
+    UNBOUNDED,
+
+    /**
+     * End from committed offsets in ZK / Kafka brokers of a specific consumer group. This is
+     * evaluated at the start of consumption from a given partition.
+     */
+    GROUP_OFFSETS,
+
+    /**
+     * End from the latest offset. This is evaluated at the start of consumption from a given
+     * partition.
+     */
+    LATEST,
+
+    /** End from user-supplied timestamp for each partition. */
+    TIMESTAMP,
+
+    /**
+     * End from user-supplied specific offsets for each partition. If an offset for a partition is
+     * not provided it will not consume from that partition.
+     */
+    SPECIFIC_OFFSETS;
+}
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
index 1150ab4b1c3..a6cdbcedc96 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
@@ -151,6 +151,12 @@ public class KafkaConnectorOptions {
                     .defaultValue(ScanStartupMode.GROUP_OFFSETS)
                     .withDescription("Startup mode for Kafka consumer.");
 
+    public static final ConfigOption<ScanBoundedMode> SCAN_BOUNDED_MODE =
+            ConfigOptions.key("scan.bounded.mode")
+                    .enumType(ScanBoundedMode.class)
+                    .defaultValue(ScanBoundedMode.UNBOUNDED)
+                    .withDescription("Bounded mode for Kafka consumer.");
+
     public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS =
             ConfigOptions.key("scan.startup.specific-offsets")
                     .stringType()
@@ -158,6 +164,13 @@ public class KafkaConnectorOptions {
                     .withDescription(
                             "Optional offsets used in case of \"specific-offsets\" startup mode");
 
+    public static final ConfigOption<String> SCAN_BOUNDED_SPECIFIC_OFFSETS =
+            ConfigOptions.key("scan.bounded.specific-offsets")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional offsets used in case of \"specific-offsets\" bounded mode");
+
     public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS =
             ConfigOptions.key("scan.startup.timestamp-millis")
                     .longType()
@@ -165,6 +178,13 @@ public class KafkaConnectorOptions {
                     .withDescription(
                             "Optional timestamp used in case of \"timestamp\" startup mode");
 
+    public static final ConfigOption<Long> SCAN_BOUNDED_TIMESTAMP_MILLIS =
+            ConfigOptions.key("scan.bounded.timestamp-millis")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional timestamp used in case of \"timestamp\" bounded mode");
+
     public static final ConfigOption<Duration> SCAN_TOPIC_PARTITION_DISCOVERY =
             ConfigOptions.key("scan.topic-partition-discovery.interval")
                     .durationType()
@@ -291,5 +311,45 @@ public class KafkaConnectorOptions {
         }
     }
 
+    /** Bounded mode for the Kafka consumer, see {@link #SCAN_BOUNDED_MODE}. */
+    public enum ScanBoundedMode implements DescribedEnum {
+        UNBOUNDED("unbounded", text("Do not stop consuming")),
+        LATEST_OFFSET(
+                "latest-offset",
+                text(
+                        "Bounded by latest offsets. This is evaluated at the start of consumption"
+                                + " from a given partition.")),
+        GROUP_OFFSETS(
+                "group-offsets",
+                text(
+                        "Bounded by committed offsets in ZooKeeper / Kafka brokers of a specific"
+                                + " consumer group. This is evaluated at the start of consumption"
+                                + " from a given partition.")),
+        TIMESTAMP("timestamp", text("Bounded by a user-supplied timestamp.")),
+        SPECIFIC_OFFSETS(
+                "specific-offsets",
+                text(
+                        "Bounded by user-supplied specific offsets for each partition. If an offset"
+                                + " for a partition is not provided it will not consume from that"
+                                + " partition."));
+        private final String value;
+        private final InlineElement description;
+
+        ScanBoundedMode(String value, InlineElement description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
+
     private KafkaConnectorOptions() {}
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
index fb848036e88..ef70644e5d9 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
@@ -24,10 +24,12 @@ import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy;
 import org.apache.flink.table.api.TableException;
@@ -56,6 +58,9 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -101,6 +106,7 @@ class KafkaConnectorOptionsUtil {
     public static void validateTableSourceOptions(ReadableConfig tableOptions) {
         validateSourceTopic(tableOptions);
         validateScanStartupMode(tableOptions);
+        validateScanBoundedMode(tableOptions);
     }
 
     public static void validateTableSinkOptions(ReadableConfig tableOptions) {
@@ -183,6 +189,49 @@ class KafkaConnectorOptionsUtil {
                         });
     }
 
+    private static void validateScanBoundedMode(ReadableConfig tableOptions) {
+        tableOptions
+                .getOptional(SCAN_BOUNDED_MODE)
+                .ifPresent(
+                        mode -> {
+                            switch (mode) {
+                                case TIMESTAMP:
+                                    if (!tableOptions
+                                            .getOptional(SCAN_BOUNDED_TIMESTAMP_MILLIS)
+                                            .isPresent()) {
+                                        throw new ValidationException(
+                                                String.format(
+                                                        "'%s' is required in '%s' bounded mode"
+                                                                + " but missing.",
+                                                        SCAN_BOUNDED_TIMESTAMP_MILLIS.key(),
+                                                        ScanBoundedMode.TIMESTAMP));
+                                    }
+
+                                    break;
+                                case SPECIFIC_OFFSETS:
+                                    if (!tableOptions
+                                            .getOptional(SCAN_BOUNDED_SPECIFIC_OFFSETS)
+                                            .isPresent()) {
+                                        throw new ValidationException(
+                                                String.format(
+                                                        "'%s' is required in '%s' bounded mode"
+                                                                + " but missing.",
+                                                        SCAN_BOUNDED_SPECIFIC_OFFSETS.key(),
+                                                        ScanBoundedMode.SPECIFIC_OFFSETS));
+                                    }
+                                    if (!isSingleTopic(tableOptions)) {
+                                        throw new ValidationException(
+                                                "Currently Kafka source only supports specific offset for single topic.");
+                                    }
+                                    String specificOffsets =
+                                            tableOptions.get(SCAN_BOUNDED_SPECIFIC_OFFSETS);
+                                    parseSpecificOffsets(
+                                            specificOffsets, SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
+                                    break;
+                            }
+                        });
+    }
+
     private static void validateSinkPartitioner(ReadableConfig tableOptions) {
         tableOptions
                 .getOptional(SINK_PARTITIONER)
@@ -241,6 +290,23 @@ class KafkaConnectorOptionsUtil {
         return options;
     }
 
+    public static BoundedOptions getBoundedOptions(ReadableConfig tableOptions) {
+        final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+        final BoundedMode boundedMode =
+                KafkaConnectorOptionsUtil.fromOption(tableOptions.get(SCAN_BOUNDED_MODE));
+        if (boundedMode == BoundedMode.SPECIFIC_OFFSETS) {
+            buildBoundedOffsets(tableOptions, tableOptions.get(TOPIC).get(0), specificOffsets);
+        }
+
+        final BoundedOptions options = new BoundedOptions();
+        options.boundedMode = boundedMode;
+        options.specificOffsets = specificOffsets;
+        if (boundedMode == BoundedMode.TIMESTAMP) {
+            options.boundedTimestampMillis = tableOptions.get(SCAN_BOUNDED_TIMESTAMP_MILLIS);
+        }
+        return options;
+    }
+
     private static void buildSpecificOffsets(
             ReadableConfig tableOptions,
             String topic,
@@ -256,6 +322,22 @@ class KafkaConnectorOptionsUtil {
                 });
     }
 
+    public static void buildBoundedOffsets(
+            ReadableConfig tableOptions,
+            String topic,
+            Map<KafkaTopicPartition, Long> specificOffsets) {
+        String specificOffsetsEndOpt = tableOptions.get(SCAN_BOUNDED_SPECIFIC_OFFSETS);
+        final Map<Integer, Long> offsetMap =
+                parseSpecificOffsets(specificOffsetsEndOpt, SCAN_BOUNDED_SPECIFIC_OFFSETS.key());
+
+        offsetMap.forEach(
+                (partition, offset) -> {
+                    final KafkaTopicPartition topicPartition =
+                            new KafkaTopicPartition(topic, partition);
+                    specificOffsets.put(topicPartition, offset);
+                });
+    }
+
     /**
      * Returns the {@link StartupMode} of Kafka Consumer by passed-in table-specific {@link
      * ScanStartupMode}.
@@ -279,6 +361,29 @@ class KafkaConnectorOptionsUtil {
         }
     }
 
+    /**
+     * Returns the {@link BoundedMode} of Kafka Consumer by passed-in table-specific {@link
+     * ScanBoundedMode}.
+     */
+    private static BoundedMode fromOption(ScanBoundedMode scanBoundedMode) {
+        switch (scanBoundedMode) {
+            case UNBOUNDED:
+                return BoundedMode.UNBOUNDED;
+            case LATEST_OFFSET:
+                return BoundedMode.LATEST;
+            case GROUP_OFFSETS:
+                return BoundedMode.GROUP_OFFSETS;
+            case TIMESTAMP:
+                return BoundedMode.TIMESTAMP;
+            case SPECIFIC_OFFSETS:
+                return BoundedMode.SPECIFIC_OFFSETS;
+
+            default:
+                throw new TableException(
+                        "Unsupported bounded mode. Validator should have checked that.");
+        }
+    }
+
     public static Properties getKafkaProperties(Map<String, String> tableOptions) {
         final Properties kafkaProperties = new Properties();
 
@@ -320,15 +425,15 @@ class KafkaConnectorOptionsUtil {
     }
 
     /**
-     * Parses SpecificOffsets String to Map.
+     * Parses specificOffsets String to Map.
      *
-     * <p>SpecificOffsets String format was given as following:
+     * <p>specificOffsets String format was given as following:
      *
      * <pre>
      *     scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300
      * </pre>
      *
-     * @return SpecificOffsets with Map format, key is partition, and value is offset
+     * @return specificOffsets with Map format, key is partition, and value is offset
      */
     public static Map<Integer, Long> parseSpecificOffsets(
             String specificOffsetsStr, String optionKey) {
@@ -581,5 +686,12 @@ class KafkaConnectorOptionsUtil {
         public long startupTimestampMillis;
     }
 
+    /** Kafka bounded options. * */
+    public static class BoundedOptions {
+        public BoundedMode boundedMode;
+        public Map<KafkaTopicPartition, Long> specificOffsets;
+        public long boundedTimestampMillis;
+    }
+
     private KafkaConnectorOptionsUtil() {}
 }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index d6bf27a104b..c963da76206 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -25,12 +25,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
@@ -149,6 +151,21 @@ public class KafkaDynamicSource
      */
     protected final long startupTimestampMillis;
 
+    /** The bounded mode for the contained consumer (default is an unbounded data stream). */
+    protected final BoundedMode boundedMode;
+
+    /**
+     * Specific end offsets; only relevant when bounded mode is {@link
+     * BoundedMode#SPECIFIC_OFFSETS}.
+     */
+    protected final Map<KafkaTopicPartition, Long> specificBoundedOffsets;
+
+    /**
+     * The bounded timestamp to locate partition offsets; only relevant when bounded mode is {@link
+     * BoundedMode#TIMESTAMP}.
+     */
+    protected final long boundedTimestampMillis;
+
     /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
     protected final boolean upsertMode;
 
@@ -167,6 +184,9 @@ public class KafkaDynamicSource
             StartupMode startupMode,
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
+            BoundedMode boundedMode,
+            Map<KafkaTopicPartition, Long> specificBoundedOffsets,
+            long boundedTimestampMillis,
             boolean upsertMode,
             String tableIdentifier) {
         // Format attributes
@@ -200,6 +220,12 @@ public class KafkaDynamicSource
                 Preconditions.checkNotNull(
                         specificStartupOffsets, "Specific offsets must not be null.");
         this.startupTimestampMillis = startupTimestampMillis;
+        this.boundedMode =
+                Preconditions.checkNotNull(boundedMode, "Bounded mode must not be null.");
+        this.specificBoundedOffsets =
+                Preconditions.checkNotNull(
+                        specificBoundedOffsets, "Specific bounded offsets must not be null.");
+        this.boundedTimestampMillis = boundedTimestampMillis;
         this.upsertMode = upsertMode;
         this.tableIdentifier = tableIdentifier;
     }
@@ -314,6 +340,9 @@ public class KafkaDynamicSource
                         startupMode,
                         specificStartupOffsets,
                         startupTimestampMillis,
+                        boundedMode,
+                        specificBoundedOffsets,
+                        boundedTimestampMillis,
                         upsertMode,
                         tableIdentifier);
         copy.producedDataType = producedDataType;
@@ -350,6 +379,9 @@ public class KafkaDynamicSource
                 && startupMode == that.startupMode
                 && Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
                 && startupTimestampMillis == that.startupTimestampMillis
+                && boundedMode == that.boundedMode
+                && Objects.equals(specificBoundedOffsets, that.specificBoundedOffsets)
+                && boundedTimestampMillis == that.boundedTimestampMillis
                 && Objects.equals(upsertMode, that.upsertMode)
                 && Objects.equals(tableIdentifier, that.tableIdentifier)
                 && Objects.equals(watermarkStrategy, that.watermarkStrategy);
@@ -363,8 +395,8 @@ public class KafkaDynamicSource
                 physicalDataType,
                 keyDecodingFormat,
                 valueDecodingFormat,
-                keyProjection,
-                valueProjection,
+                Arrays.hashCode(keyProjection),
+                Arrays.hashCode(valueProjection),
                 keyPrefix,
                 topics,
                 topicPattern,
@@ -372,6 +404,9 @@ public class KafkaDynamicSource
                 startupMode,
                 specificStartupOffsets,
                 startupTimestampMillis,
+                boundedMode,
+                specificBoundedOffsets,
+                boundedTimestampMillis,
                 upsertMode,
                 tableIdentifier,
                 watermarkStrategy);
@@ -427,6 +462,30 @@ public class KafkaDynamicSource
                 break;
         }
 
+        switch (boundedMode) {
+            case UNBOUNDED:
+                kafkaSourceBuilder.setUnbounded(new NoStoppingOffsetsInitializer());
+                break;
+            case LATEST:
+                kafkaSourceBuilder.setBounded(OffsetsInitializer.latest());
+                break;
+            case GROUP_OFFSETS:
+                kafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets());
+                break;
+            case SPECIFIC_OFFSETS:
+                Map<TopicPartition, Long> offsets = new HashMap<>();
+                specificBoundedOffsets.forEach(
+                        (tp, offset) ->
+                                offsets.put(
+                                        new TopicPartition(tp.getTopic(), tp.getPartition()),
+                                        offset));
+                kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(offsets));
+                break;
+            case TIMESTAMP:
+                kafkaSourceBuilder.setBounded(OffsetsInitializer.timestamp(boundedTimestampMillis));
+                break;
+        }
+
         kafkaSourceBuilder
                 .setProperties(properties)
                 .setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 5ac146d7197..48c00918aa1 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -27,9 +27,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.format.DecodingFormat;
@@ -69,6 +71,9 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
@@ -85,6 +90,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
@@ -143,6 +149,9 @@ public class KafkaDynamicTableFactory
         options.add(DELIVERY_GUARANTEE);
         options.add(TRANSACTIONAL_ID_PREFIX);
         options.add(SINK_SEMANTIC);
+        options.add(SCAN_BOUNDED_MODE);
+        options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
+        options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
         return options;
     }
 
@@ -187,6 +196,8 @@ public class KafkaDynamicTableFactory
 
         final StartupOptions startupOptions = getStartupOptions(tableOptions);
 
+        final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
+
         final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions());
 
         // add topic-partition discovery
@@ -217,6 +228,9 @@ public class KafkaDynamicTableFactory
                 startupOptions.startupMode,
                 startupOptions.specificOffsets,
                 startupOptions.startupTimestampMillis,
+                boundedOptions.boundedMode,
+                boundedOptions.specificOffsets,
+                boundedOptions.boundedTimestampMillis,
                 context.getObjectIdentifier().asSummaryString());
     }
 
@@ -378,6 +392,9 @@ public class KafkaDynamicTableFactory
             StartupMode startupMode,
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
+            BoundedMode boundedMode,
+            Map<KafkaTopicPartition, Long> specificEndOffsets,
+            long endTimestampMillis,
             String tableIdentifier) {
         return new KafkaDynamicSource(
                 physicalDataType,
@@ -392,6 +409,9 @@ public class KafkaDynamicTableFactory
                 startupMode,
                 specificStartupOffsets,
                 startupTimestampMillis,
+                boundedMode,
+                specificEndOffsets,
+                endTimestampMillis,
                 false,
                 tableIdentifier);
     }
diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index 8682e405121..254e1bf9852 100644
--- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -141,6 +142,9 @@ public class UpsertKafkaDynamicTableFactory
                 earliest,
                 Collections.emptyMap(),
                 0,
+                BoundedMode.UNBOUNDED,
+                Collections.emptyMap(),
+                0,
                 true,
                 context.getObjectIdentifier().asSummaryString());
     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java
index 572b77d0787..e95f05babe1 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source;
 
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
 
 import java.util.Collection;
@@ -50,4 +51,9 @@ public class KafkaSourceTestUtils {
     public static Configuration getKafkaSourceConfiguration(KafkaSource<?> kafkaSource) {
         return kafkaSource.getConfiguration();
     }
+
+    /** Get stopping offsets initializer. */
+    public static OffsetsInitializer getStoppingOffsetsInitializer(KafkaSource<?> kafkaSource) {
+        return kafkaSource.getStoppingOffsetsInitializer();
+    }
 }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 0b908abfa5f..a0b74a5b763 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
@@ -30,6 +31,7 @@ import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
+import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
 import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
 import org.apache.flink.formats.avro.RowDataToAvroConverters;
@@ -38,6 +40,7 @@ import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSer
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -73,7 +76,9 @@ import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.junit.jupiter.api.Test;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.NullSource;
@@ -82,6 +87,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -91,6 +97,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -103,7 +110,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Abstract test base for {@link KafkaDynamicTableFactory}. */
+/** Tests for {@link KafkaDynamicTableFactory}. */
 @ExtendWith(TestLoggerExtension.class)
 public class KafkaDynamicTableFactoryTest {
 
@@ -425,6 +432,220 @@ public class KafkaDynamicTableFactoryTest {
         }
     }
 
+    @Test
+    public void testBoundedSpecificOffsetsValidate() {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getBasicSourceOptions(),
+                        options -> {
+                            options.put(
+                                    KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(),
+                                    "specific-offsets");
+                        });
+        assertThatThrownBy(() -> createTableSource(SCHEMA, modifiedOptions))
+                .cause()
+                .hasMessageContaining(
+                        "'scan.bounded.specific-offsets' is required in 'specific-offsets' bounded mode but missing.");
+    }
+
+    @Test
+    public void testBoundedSpecificOffsets() {
+        testBoundedOffsets(
+                "specific-offsets",
+                options -> {
+                    options.put("scan.bounded.specific-offsets", "partition:0,offset:2");
+                },
+                source -> {
+                    assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new TopicPartition(TOPIC, 0);
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    MockPartitionOffsetsRetriever.noInteractions());
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, 2L);
+                });
+    }
+
+    @Test
+    public void testBoundedLatestOffset() {
+        testBoundedOffsets(
+                "latest-offset",
+                options -> {},
+                source -> {
+                    assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new TopicPartition(TOPIC, 0);
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    MockPartitionOffsetsRetriever.noInteractions());
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET);
+                });
+    }
+
+    @Test
+    public void testBoundedGroupOffsets() {
+        testBoundedOffsets(
+                "group-offsets",
+                options -> {},
+                source -> {
+                    assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new TopicPartition(TOPIC, 0);
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    MockPartitionOffsetsRetriever.noInteractions());
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, KafkaPartitionSplit.COMMITTED_OFFSET);
+                });
+    }
+
+    @Test
+    public void testBoundedTimestamp() {
+        testBoundedOffsets(
+                "timestamp",
+                options -> {
+                    options.put("scan.bounded.timestamp-millis", "1");
+                },
+                source -> {
+                    assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED);
+                    OffsetsInitializer offsetsInitializer =
+                            KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
+                    TopicPartition partition = new TopicPartition(TOPIC, 0);
+                    long offsetForTimestamp = 123L;
+                    Map<TopicPartition, Long> partitionOffsets =
+                            offsetsInitializer.getPartitionOffsets(
+                                    Collections.singletonList(partition),
+                                    MockPartitionOffsetsRetriever.timestampAndEnd(
+                                            partitions -> {
+                                                assertThat(partitions)
+                                                        .containsOnlyKeys(partition)
+                                                        .containsEntry(partition, 1L);
+                                                Map<TopicPartition, OffsetAndTimestamp> result =
+                                                        new HashMap<>();
+                                                result.put(
+                                                        partition,
+                                                        new OffsetAndTimestamp(
+                                                                offsetForTimestamp, 1L));
+                                                return result;
+                                            },
+                                            partitions -> {
+                                                Map<TopicPartition, Long> result = new HashMap<>();
+                                                result.put(
+                                                        partition,
+                                                        // the end offset is bigger than given by
+                                                        // timestamp
+                                                        // to make sure the one for timestamp is
+                                                        // used
+                                                        offsetForTimestamp + 1000L);
+                                                return result;
+                                            }));
+                    assertThat(partitionOffsets)
+                            .containsOnlyKeys(partition)
+                            .containsEntry(partition, offsetForTimestamp);
+                });
+    }
+
+    private void testBoundedOffsets(
+            String boundedMode,
+            Consumer<Map<String, String>> optionsConfig,
+            Consumer<KafkaSource<?>> validator) {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getBasicSourceOptions(),
+                        options -> {
+                            options.put(KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(), boundedMode);
+                            optionsConfig.accept(options);
+                        });
+        final DynamicTableSource tableSource = createTableSource(SCHEMA, modifiedOptions);
+        assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class);
+        ScanTableSource.ScanRuntimeProvider provider =
+                ((KafkaDynamicSource) tableSource)
+                        .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+        final KafkaSource<?> kafkaSource = assertKafkaSource(provider);
+        validator.accept(kafkaSource);
+    }
+
+    private interface OffsetsRetriever
+            extends Function<Collection<TopicPartition>, Map<TopicPartition, Long>> {}
+
+    private interface TimestampOffsetsRetriever
+            extends Function<Map<TopicPartition, Long>, Map<TopicPartition, OffsetAndTimestamp>> {}
+
+    private static final class MockPartitionOffsetsRetriever
+            implements OffsetsInitializer.PartitionOffsetsRetriever {
+
+        public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL =
+                partitions -> {
+                    throw new UnsupportedOperationException(
+                            "The method was not supposed to be called");
+                };
+        private final OffsetsRetriever committedOffsets;
+        private final OffsetsRetriever endOffsets;
+        private final OffsetsRetriever beginningOffsets;
+        private final TimestampOffsetsRetriever offsetsForTimes;
+
+        static MockPartitionOffsetsRetriever noInteractions() {
+            return new MockPartitionOffsetsRetriever(
+                    UNSUPPORTED_RETRIEVAL,
+                    UNSUPPORTED_RETRIEVAL,
+                    UNSUPPORTED_RETRIEVAL,
+                    partitions -> {
+                        throw new UnsupportedOperationException(
+                                "The method was not supposed to be called");
+                    });
+        }
+
+        static MockPartitionOffsetsRetriever timestampAndEnd(
+                TimestampOffsetsRetriever retriever, OffsetsRetriever endOffsets) {
+            return new MockPartitionOffsetsRetriever(
+                    UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever);
+        }
+
+        private MockPartitionOffsetsRetriever(
+                OffsetsRetriever committedOffsets,
+                OffsetsRetriever endOffsets,
+                OffsetsRetriever beginningOffsets,
+                TimestampOffsetsRetriever offsetsForTimes) {
+            this.committedOffsets = committedOffsets;
+            this.endOffsets = endOffsets;
+            this.beginningOffsets = beginningOffsets;
+            this.offsetsForTimes = offsetsForTimes;
+        }
+
+        @Override
+        public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) {
+            return committedOffsets.apply(partitions);
+        }
+
+        @Override
+        public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+            return endOffsets.apply(partitions);
+        }
+
+        @Override
+        public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+            return beginningOffsets.apply(partitions);
+        }
+
+        @Override
+        public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(
+                Map<TopicPartition, Long> timestampsToSearch) {
+            return offsetsForTimes.apply(timestampsToSearch);
+        }
+    }
+
     @Test
     public void testTableSink() {
         final Map<String, String> modifiedOptions =
@@ -974,6 +1195,9 @@ public class KafkaDynamicTableFactoryTest {
                 startupMode,
                 specificStartupOffsets,
                 startupTimestampMillis,
+                BoundedMode.UNBOUNDED,
+                Collections.emptyMap(),
+                0,
                 false,
                 FactoryMocks.IDENTIFIER.asSummaryString());
     }
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 975e5cd4375..515526f935e 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.utils.EncodingUtils;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -40,6 +41,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.time.Duration;
+import java.time.Instant;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -179,6 +181,129 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+
+        final String createTable =
+                String.format(
+                        "CREATE TABLE kafka (\n"
+                                + "  `user_id` INT,\n"
+                                + "  `item_id` INT,\n"
+                                + "  `behavior` STRING\n"
+                                + ") WITH (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'properties.group.id' = '%s',\n"
+                                + "  'scan.startup.mode' = 'earliest-offset',\n"
+                                + "  'scan.bounded.mode' = 'specific-offsets',\n"
+                                + "  'scan.bounded.specific-offsets' = 'partition:0,offset:2',\n"
+                                + "  %s\n"
+                                + ")\n",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+
+        tEnv.executeSql(createTable);
+
+        List<Row> values =
+                Arrays.asList(
+                        Row.of(1, 1102, "behavior 1"),
+                        Row.of(2, 1103, "behavior 2"),
+                        Row.of(3, 1104, "behavior 3"));
+        tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+        // ---------- Consume stream from Kafka -------------------
+
+        List<Row> results = new ArrayList<>();
+        try (CloseableIterator<Row> resultsItr =
+                tEnv.sqlQuery("SELECT * from kafka").execute().collect()) {
+            while (resultsItr.hasNext()) {
+                results.add(resultsItr.next());
+            }
+        }
+
+        assertThat(results)
+                .containsExactly(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, "behavior 2"));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
+    @Test
+    public void testKafkaSourceSinkWithBoundedTimestamp() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic = "bounded_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka -------------------
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+
+        final String createTable =
+                String.format(
+                        "CREATE TABLE kafka (\n"
+                                + "  `user_id` INT,\n"
+                                + "  `item_id` INT,\n"
+                                + "  `behavior` STRING,\n"
+                                + "  `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'"
+                                + ") WITH (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'properties.group.id' = '%s',\n"
+                                + "  'scan.startup.mode' = 'earliest-offset',\n"
+                                + "  'scan.bounded.mode' = 'timestamp',\n"
+                                + "  'scan.bounded.timestamp-millis' = '5',\n"
+                                + "  %s\n"
+                                + ")\n",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        topic,
+                        bootstraps,
+                        groupId,
+                        formatOptions());
+
+        tEnv.executeSql(createTable);
+
+        List<Row> values =
+                Arrays.asList(
+                        Row.of(1, 1102, "behavior 1", Instant.ofEpochMilli(0L)),
+                        Row.of(2, 1103, "behavior 2", Instant.ofEpochMilli(3L)),
+                        Row.of(3, 1104, "behavior 3", Instant.ofEpochMilli(7L)));
+        tEnv.fromValues(values).insertInto("kafka").execute().await();
+
+        // ---------- Consume stream from Kafka -------------------
+
+        List<Row> results = new ArrayList<>();
+        try (CloseableIterator<Row> resultsItr =
+                tEnv.sqlQuery("SELECT * from kafka").execute().collect()) {
+            while (resultsItr.hasNext()) {
+                results.add(resultsItr.next());
+            }
+        }
+
+        assertThat(results)
+                .containsExactly(
+                        Row.of(1, 1102, "behavior 1", Instant.ofEpochMilli(0L)),
+                        Row.of(2, 1103, "behavior 2", Instant.ofEpochMilli(3L)));
+
+        // ------------- cleanup -------------------
+
+        deleteTestTopic(topic);
+    }
+
     @Test
     public void testKafkaTableWithMultipleTopics() throws Exception {
         // ---------- create source and sink tables -------------------
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 79884ad0929..5caaaa0acbc 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 import org.apache.flink.table.api.DataTypes;
@@ -609,6 +610,9 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger {
                 StartupMode.EARLIEST,
                 Collections.emptyMap(),
                 0,
+                BoundedMode.UNBOUNDED,
+                Collections.emptyMap(),
+                0,
                 true,
                 FactoryMocks.IDENTIFIER.asSummaryString());
     }