You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/19 03:38:03 UTC

[inlong] branch master updated: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3318c1c30 [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)
3318c1c30 is described below

commit 3318c1c306e72e0074b071f37937745bddfb4db5
Author: Xin Gong <ge...@gmail.com>
AuthorDate: Mon Sep 19 11:37:57 2022 +0800

    [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)
---
 .../org/apache/inlong/sort/base/Constants.java     |    4 +
 .../inlong/sort/base/metric/MetricState.java       |    8 +
 .../inlong/sort/base/metric/SinkMetricData.java    |   82 +-
 .../inlong/sort/base/util/MetricStateUtils.java    |   24 +
 inlong-sort/sort-connectors/kafka/pom.xml          |    6 +
 .../inlong/sort/kafka/FlinkKafkaConsumer.java      |  352 +++++
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  | 1350 ++++++++++++++++++++
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |   69 +-
 .../table/DynamicKafkaDeserializationSchema.java   |   61 +-
 .../sort/kafka/table/KafkaDynamicSource.java       |   82 +-
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |    2 +
 licenses/inlong-sort-connectors/LICENSE            |   12 +
 12 files changed, 1933 insertions(+), 119 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 9daed86e0..93951770b 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -37,6 +37,10 @@ public final class Constants {
 
     public static final String NUM_RECORDS_OUT = "numRecordsOut";
 
+    public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter";
+
+    public static final String NUM_RECORDS_OUT_FOR_METER = "numRecordsOutForMeter";
+
     public static final String NUM_BYTES_OUT_PER_SECOND = "numBytesOutPerSecond";
 
     public static final String NUM_RECORDS_OUT_PER_SECOND = "numRecordsOutPerSecond";
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
index 9240c0c8a..604800ccf 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -62,4 +62,12 @@ public class MetricState implements Serializable {
         }
         return 0L;
     }
+
+    @Override
+    public String toString() {
+        return "MetricState{"
+                + "subtaskIndex=" + subtaskIndex
+                + ", metrics=" + metrics.toString()
+                + '}';
+    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 67b47657e..4073ddd44 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -34,8 +34,10 @@ import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
 
 /**
@@ -50,6 +52,8 @@ public class SinkMetricData implements MetricData {
     private AuditImp auditImp;
     private Counter numRecordsOut;
     private Counter numBytesOut;
+    private Counter numRecordsOutForMeter;
+    private Counter numBytesOutForMeter;
     private Counter dirtyRecords;
     private Counter dirtyBytes;
     private Meter numRecordsOutPerSecond;
@@ -76,6 +80,43 @@ public class SinkMetricData implements MetricData {
         }
     }
 
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsOutForMeter() {
+        registerMetricsForNumRecordsOutForMeter(new SimpleCounter());
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumRecordsOutForMeter(Counter counter) {
+        numRecordsOutForMeter = registerCounter(NUM_RECORDS_OUT_FOR_METER, counter);
+    }
+
+    /**
+     * Default counter is {@link SimpleCounter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesOutForMeter() {
+        registerMetricsForNumBytesOutForMeter(new SimpleCounter());
+
+    }
+
+    /**
+     * User can use custom counter that extends from {@link Counter}
+     * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
+     * prometheus
+     */
+    public void registerMetricsForNumBytesOutForMeter(Counter counter) {
+        numBytesOutForMeter = registerCounter(NUM_BYTES_OUT_FOR_METER, counter);
+    }
+
     /**
      * Default counter is {@link SimpleCounter}
      * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter
@@ -114,11 +155,11 @@ public class SinkMetricData implements MetricData {
     }
 
     public void registerMetricsForNumRecordsOutPerSecond() {
-        numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOut);
+        numRecordsOutPerSecond = registerMeter(NUM_RECORDS_OUT_PER_SECOND, this.numRecordsOutForMeter);
     }
 
     public void registerMetricsForNumBytesOutPerSecond() {
-        numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOut);
+        numBytesOutPerSecond = registerMeter(NUM_BYTES_OUT_PER_SECOND, this.numBytesOutForMeter);
     }
 
     public void registerMetricsForDirtyRecords() {
@@ -191,10 +232,20 @@ public class SinkMetricData implements MetricData {
         return nodeId;
     }
 
+    public Counter getNumRecordsOutForMeter() {
+        return numRecordsOutForMeter;
+    }
+
+    public Counter getNumBytesOutForMeter() {
+        return numBytesOutForMeter;
+    }
+
     public void invokeWithEstimate(Object o) {
         long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
-        getNumRecordsOut().inc();
-        getNumBytesOut().inc(size);
+        this.numRecordsOut.inc();
+        this.numBytesOut.inc(size);
+        this.numRecordsOutForMeter.inc();
+        this.numBytesOutForMeter.inc(size);
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_OUTPUT,
@@ -207,8 +258,10 @@ public class SinkMetricData implements MetricData {
     }
 
     public void invoke(long rowCount, long rowSize) {
-        getNumRecordsOut().inc(rowCount);
-        getNumBytesOut().inc(rowSize);
+        this.numRecordsOut.inc(rowCount);
+        this.numBytesOut.inc(rowSize);
+        this.numRecordsOutForMeter.inc(rowCount);
+        this.numBytesOutForMeter.inc(rowSize);
         if (auditImp != null) {
             auditImp.add(
                     Constants.AUDIT_SORT_OUTPUT,
@@ -219,4 +272,21 @@ public class SinkMetricData implements MetricData {
                     rowSize);
         }
     }
+
+    @Override
+    public String toString() {
+        return "SinkMetricData{"
+                + "groupId='" + groupId + '\''
+                + ", streamId='" + streamId + '\''
+                + ", nodeId='" + nodeId + '\''
+                + ", numRecordsOut=" + numRecordsOut.getCount()
+                + ", numBytesOut=" + numBytesOut.getCount()
+                + ", numRecordsOutForMeter=" + numRecordsOutForMeter.getCount()
+                + ", numBytesOutForMeter=" + numBytesOutForMeter.getCount()
+                + ", dirtyRecords=" + dirtyRecords.getCount()
+                + ", dirtyBytes=" + dirtyBytes.getCount()
+                + ", numRecordsOutPerSecond=" + numRecordsOutPerSecond.getRate()
+                + ", numBytesOutPerSecond=" + numBytesOutPerSecond.getRate()
+                + '}';
+    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
index d878381ba..416c8b719 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.base.util;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 
 import java.util.ArrayList;
@@ -29,7 +30,9 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * metric state for {@link MetricState} supporting snapshot and restore
@@ -125,4 +128,25 @@ public class MetricStateUtils {
         metricStateListState.add(metricState);
     }
 
+    /**
+     *
+     * Snapshot metric state data for {@link SinkMetricData}
+     * @param metricStateListState state data list
+     * @param sinkMetricData {@link SinkMetricData} A collection class for handling metrics
+     * @param subtaskIndex subtask index
+     * @throws Exception throw exception when add metric state
+     */
+    public static void snapshotMetricStateForSinkMetricData(ListState<MetricState> metricStateListState,
+            SinkMetricData sinkMetricData, Integer subtaskIndex)
+            throws Exception {
+        log.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, subtaskIndex:{}",
+                metricStateListState, sinkMetricData, subtaskIndex);
+        metricStateListState.clear();
+        Map<String, Long> metricDataMap = new HashMap<>();
+        metricDataMap.put(NUM_RECORDS_OUT, sinkMetricData.getNumRecordsOut().getCount());
+        metricDataMap.put(NUM_BYTES_OUT, sinkMetricData.getNumBytesOut().getCount());
+        MetricState metricState = new MetricState(subtaskIndex, metricDataMap);
+        metricStateListState.add(metricState);
+    }
+
 }
diff --git a/inlong-sort/sort-connectors/kafka/pom.xml b/inlong-sort/sort-connectors/kafka/pom.xml
index f6ace504d..13a1c2098 100644
--- a/inlong-sort/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-connectors/kafka/pom.xml
@@ -92,6 +92,12 @@
                                 </filter>
                             </filters>
                             <relocations>
+                                <relocation>
+                                    <pattern>org.apache.inlong.sort.base</pattern>
+                                    <shadedPattern>
+                                        org.apache.inlong.sort.kafka.shaded.org.apache.inlong.sort.base
+                                    </shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
new file mode 100644
index 000000000..924944188
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
@@ -0,0 +1,352 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+import org.apache.flink.util.PropertiesUtil;
+import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.PropertiesUtil.getBoolean;
+import static org.apache.flink.util.PropertiesUtil.getLong;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from Apache
+ * Kafka. The consumer can run in multiple parallel instances, each of which will pull data from one
+ * or more Kafka partitions.
+ *
+ * <p>The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
+ * during a failure, and that the computation processes elements "exactly once". (Note: These
+ * guarantees naturally assume that Kafka itself does not loose any data.)
+ *
+ * <p>Please note that Flink snapshots the offsets internally as part of its distributed
+ * checkpoints. The offsets committed to Kafka are only to bring the outside view of progress in
+ * sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of how
+ * far the Flink Kafka consumer has consumed a topic.
+ *
+ * <p>Please refer to Kafka's documentation for the available configuration properties:
+ * http://kafka.apache.org/documentation.html#newconsumerconfigs
+ */
+@PublicEvolving
+public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Configuration key to change the polling timeout. *
+     */
+    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
+
+    /**
+     * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+     * available. If 0, returns immediately with any records that are available now.
+     */
+    public static final long DEFAULT_POLL_TIMEOUT = 100L;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * User-supplied properties for Kafka. *
+     */
+    protected final Properties properties;
+
+    /**
+     * From Kafka's Javadoc: The time, in milliseconds, spent waiting in poll if data is not
+     * available. If 0, returns immediately with any records that are available now
+     */
+    protected final long pollTimeout;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * @param topic The name of the topic that should be consumed.
+     * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and
+     *         Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            String topic, DeserializationSchema<T> valueDeserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(Collections.singletonList(topic), valueDeserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
+     * pairs, offsets, and topic names from Kafka.
+     *
+     * @param topic The name of the topic that should be consumed.
+     * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+     *         and Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            String topic, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(Collections.singletonList(topic), deserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * <p>This constructor allows passing multiple topics to the consumer.
+     *
+     * @param topics The Kafka topics to read from.
+     * @param deserializer The de-/serializer used to convert between Kafka's byte messages and
+     *         Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            List<String> topics, DeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer.
+     *
+     * <p>This constructor allows passing multiple topics and a key/value deserialization schema.
+     *
+     * @param topics The Kafka topics to read from.
+     * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+     *         and Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+        this(topics, null, deserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple
+     * topics based on a regular expression pattern.
+     *
+     * <p>If partition discovery is enabled (by setting a non-negative value for {@link
+     * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with
+     * names matching the pattern will also be subscribed to as they are created on the fly.
+     *
+     * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe
+     *         to.
+     * @param valueDeserializer The de-/serializer used to convert between Kafka's byte messages and
+     *         Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer,
+            Properties props, String inlongMetric, String auditHostAndPorts) {
+        this(
+                null,
+                subscriptionPattern,
+                new KafkaDeserializationSchemaWrapper<>(valueDeserializer),
+                props, inlongMetric, auditHostAndPorts);
+    }
+
+    /**
+     * Creates a new Kafka streaming source consumer. Use this constructor to subscribe to multiple
+     * topics based on a regular expression pattern.
+     *
+     * <p>If partition discovery is enabled (by setting a non-negative value for {@link
+     * FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics with
+     * names matching the pattern will also be subscribed to as they are created on the fly.
+     *
+     * <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
+     * pairs, offsets, and topic names from Kafka.
+     *
+     * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe
+     *         to.
+     * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages
+     *         and Flink's objects.
+     * @param props
+     */
+    public FlinkKafkaConsumer(
+            Pattern subscriptionPattern,
+            KafkaDeserializationSchema<T> deserializer,
+            Properties props, String inlongMetric, String auditHostAndPorts) {
+        this(null, subscriptionPattern, deserializer, props, inlongMetric, auditHostAndPorts);
+    }
+
+    private FlinkKafkaConsumer(
+            List<String> topics,
+            Pattern subscriptionPattern,
+            KafkaDeserializationSchema<T> deserializer,
+            Properties props, String inlongMetric,
+            String auditHostAndPorts) {
+
+        super(
+                topics,
+                subscriptionPattern,
+                deserializer,
+                getLong(
+                        checkNotNull(props, "props"),
+                        KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+                        PARTITION_DISCOVERY_DISABLED),
+                !getBoolean(props, KEY_DISABLE_METRICS, false), inlongMetric, auditHostAndPorts);
+
+        this.properties = props;
+        setDeserializer(this.properties);
+
+        // configure the polling timeout
+        try {
+            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
+                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
+            } else {
+                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    "Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
+        }
+    }
+
+    @Override
+    protected AbstractFetcher<T, ?> createFetcher(
+            SourceContext<T> sourceContext,
+            Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets,
+            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+            StreamingRuntimeContext runtimeContext,
+            OffsetCommitMode offsetCommitMode,
+            MetricGroup consumerMetricGroup,
+            boolean useMetrics)
+            throws Exception {
+
+        // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS;
+        // this overwrites whatever setting the user configured in the properties
+        adjustAutoCommitConfig(properties, offsetCommitMode);
+
+        return new KafkaFetcher<>(
+                sourceContext,
+                assignedPartitionsWithInitialOffsets,
+                watermarkStrategy,
+                runtimeContext.getProcessingTimeService(),
+                runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+                runtimeContext.getUserCodeClassLoader(),
+                runtimeContext.getTaskNameWithSubtasks(),
+                deserializer,
+                properties,
+                pollTimeout,
+                runtimeContext.getMetricGroup(),
+                consumerMetricGroup,
+                useMetrics);
+    }
+
+    @Override
+    protected AbstractPartitionDiscoverer createPartitionDiscoverer(
+            KafkaTopicsDescriptor topicsDescriptor,
+            int indexOfThisSubtask,
+            int numParallelSubtasks) {
+
+        return new KafkaPartitionDiscoverer(
+                topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);
+    }
+
+    @Override
+    protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
+            Collection<KafkaTopicPartition> partitions, long timestamp) {
+
+        Map<TopicPartition, Long> partitionOffsetsRequest = new HashMap<>(partitions.size());
+        for (KafkaTopicPartition partition : partitions) {
+            partitionOffsetsRequest.put(
+                    new TopicPartition(partition.getTopic(), partition.getPartition()), timestamp);
+        }
+
+        final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
+        // use a short-lived consumer to fetch the offsets;
+        // this is ok because this is a one-time operation that happens only on startup
+        try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) {
+            for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
+                    consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
+
+                result.put(
+                        new KafkaTopicPartition(
+                                partitionToOffset.getKey().topic(),
+                                partitionToOffset.getKey().partition()),
+                        (partitionToOffset.getValue() == null)
+                                ? null
+                                : partitionToOffset.getValue().offset());
+            }
+        }
+        return result;
+    }
+
+    @Override
+    protected boolean getIsAutoCommitEnabled() {
+        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+                && PropertiesUtil.getLong(
+                properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000)
+                > 0;
+    }
+
+    /**
+     * Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
+     *
+     * @param props The Kafka properties to register the serializer in.
+     */
+    private static void setDeserializer(Properties props) {
+        final String deSerName = ByteArrayDeserializer.class.getName();
+
+        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+
+        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
+            LOG.warn(
+                    "Ignoring configured key DeSerializer ({})",
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        }
+        if (valDeSer != null && !valDeSer.equals(deSerName)) {
+            LOG.warn(
+                    "Ignoring configured value DeSerializer ({})",
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        }
+
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
+    }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
new file mode 100644
index 000000000..0d0ab4544
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -0,0 +1,1350 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka;
+
+import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
+import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
+import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
+import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
+import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedValue;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_FAILED_METRICS_COUNTER;
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_SUCCEEDED_METRICS_COUNTER;
+import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.KAFKA_CONSUMER_METRICS_GROUP;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
+ *
+ * Base class of all Flink Kafka Consumer data sources. This implements the common behavior across
+ * all Kafka versions.
+ *
+ * <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the {@link
+ * AbstractFetcher}.
+ *
+ * @param <T> The type of records produced by this data source
+ */
+@Internal
+public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
+        implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {
+
+    private static final long serialVersionUID = -6272159445203409112L;
+
+    protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
+
+    /**
+     * The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
+     */
+    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
+
+    /**
+     * The default interval to execute partition discovery, in milliseconds ({@code Long.MIN_VALUE},
+     * i.e. disabled by default).
+     */
+    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
+
+    /**
+     * Boolean configuration key to disable metrics tracking. *
+     */
+    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
+
+    /**
+     * Configuration key to define the consumer's partition discovery interval, in milliseconds.
+     */
+    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS =
+            "flink.partition-discovery.interval-millis";
+
+    /**
+     * State name of the consumer's partition offset states.
+     */
+    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
+
+    // ------------------------------------------------------------------------
+    //  configuration state, set on the client relevant for all subtasks
+    // ------------------------------------------------------------------------
+
+    /**
+     * Describes whether we are discovering partitions for fixed topics or a topic pattern.
+     */
+    private final KafkaTopicsDescriptor topicsDescriptor;
+
+    /**
+     * The schema to convert between Kafka's byte messages, and Flink's objects.
+     */
+    protected final KafkaDeserializationSchema<T> deserializer;
+
+    /**
+     * The set of topic partitions that the source will read, with their initial offsets to start
+     * reading from.
+     */
+    private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
+
+    /**
+     * Optional watermark strategy that will be run per Kafka partition, to exploit per-partition
+     * timestamp characteristics. The watermark strategy is kept in serialized form, to deserialize
+     * it into multiple copies.
+     */
+    private SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
+
+    /**
+     * User-set flag determining whether or not to commit on checkpoints. Note: this flag does not
+     * represent the final offset commit mode.
+     */
+    private boolean enableCommitOnCheckpoints = true;
+
+    /**
+     * User-set flag to disable filtering restored partitions with current topics descriptor.
+     */
+    private boolean filterRestoredPartitionsWithCurrentTopicsDescriptor = true;
+
+    /**
+     * The offset commit mode for the consumer. The value of this can only be determined in {@link
+     * FlinkKafkaConsumerBase#open(Configuration)} since it depends on whether or not checkpointing
+     * is enabled for the job.
+     */
+    private OffsetCommitMode offsetCommitMode;
+
+    /**
+     * User configured value for discovery interval, in milliseconds.
+     */
+    private final long discoveryIntervalMillis;
+
+    /**
+     * The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}).
+     */
+    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+
+    /**
+     * Specific startup offsets; only relevant when startup mode is {@link
+     * StartupMode#SPECIFIC_OFFSETS}.
+     */
+    private Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+    /**
+     * Timestamp to determine startup offsets; only relevant when startup mode is {@link
+     * StartupMode#TIMESTAMP}.
+     */
+    private Long startupOffsetsTimestamp;
+
+    // ------------------------------------------------------------------------
+    //  runtime state (used individually by each parallel subtask)
+    // ------------------------------------------------------------------------
+
+    /**
+     * Data for pending but uncommitted offsets.
+     */
+    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
+
+    /**
+     * The fetcher implements the connections to the Kafka brokers.
+     */
+    private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
+
+    /**
+     * The partition discoverer, used to find new partitions.
+     */
+    private transient volatile AbstractPartitionDiscoverer partitionDiscoverer;
+
+    /**
+     * The offsets to restore to, if the consumer restores state from a checkpoint.
+     *
+     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)}
+     * method.
+     *
+     * <p>Using a sorted map as the ordering is important when using restored state to seed the
+     * partition discoverer.
+     */
+    private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
+
+    /**
+     * Accessor for state in the operator state backend.
+     */
+    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
+
+    /**
+     * Discovery loop, executed in a separate thread.
+     */
+    private transient volatile Thread discoveryLoopThread;
+
+    /**
+     * Flag indicating whether the consumer is still running.
+     */
+    private volatile boolean running = true;
+
+    // ------------------------------------------------------------------------
+    //  internal metrics
+    // ------------------------------------------------------------------------
+
+    /**
+     * Flag indicating whether or not metrics should be exposed. If {@code true}, offset metrics
+     * (e.g. current offset, committed offset) and Kafka-shipped metrics will be registered.
+     */
+    private final boolean useMetrics;
+
+    /**
+     * Counter for successful Kafka offset commits.
+     */
+    private transient Counter successfulCommits;
+
+    /**
+     * Counter for failed Kafka offset commits.
+     */
+    private transient Counter failedCommits;
+
+    /**
+     * Callback interface that will be invoked upon async Kafka commit completion. Please be aware
+     * that default callback implementation in base class does not provide any guarantees on
+     * thread-safety. This is sufficient for now because current supported Kafka connectors
+     * guarantee no more than 1 concurrent async pending offset commit.
+     */
+    private transient KafkaCommitCallback offsetCommitCallback;
+
+    private transient ListState<MetricState> metricStateListState;
+
+    private MetricState metricState;
+
+    /**
+     * Metric for InLong
+     */
+    private String inlongMetric;
+    /**
+     * audit host and ports
+     */
+    private String inlongAudit;
+
+    private SourceMetricData sourceMetricData;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * Base constructor.
+     *
+     * @param topics fixed list of topics to subscribe to (null, if using topic pattern)
+     * @param topicPattern the topic pattern to subscribe to (null, if using fixed topics)
+     * @param deserializer The deserializer to turn raw byte messages into Java/Scala objects.
+     * @param discoveryIntervalMillis the topic / partition discovery interval, in milliseconds (0
+     *         if discovery is disabled).
+     */
+    public FlinkKafkaConsumerBase(
+            List<String> topics,
+            Pattern topicPattern,
+            KafkaDeserializationSchema<T> deserializer,
+            long discoveryIntervalMillis,
+            boolean useMetrics, String inlongMetric, String auditHostAndPorts) {
+        this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
+        this.deserializer = checkNotNull(deserializer, "valueDeserializer");
+
+        checkArgument(
+                discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED
+                        || discoveryIntervalMillis >= 0,
+                "Cannot define a negative value for the topic / partition discovery interval.");
+        this.discoveryIntervalMillis = discoveryIntervalMillis;
+
+        this.useMetrics = useMetrics;
+        this.inlongMetric = inlongMetric;
+        this.inlongAudit = auditHostAndPorts;
+    }
+
+    /**
+     * Make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS. This
+     * overwrites whatever setting the user configured in the properties.
+     *
+     * @param properties - Kafka configuration properties to be adjusted
+     * @param offsetCommitMode offset commit mode
+     */
+    protected static void adjustAutoCommitConfig(
+            Properties properties, OffsetCommitMode offsetCommitMode) {
+        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS
+                || offsetCommitMode == OffsetCommitMode.DISABLED) {
+            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        }
+    }
+    // ------------------------------------------------------------------------
+    //  Configuration
+    // ------------------------------------------------------------------------
+
+    /**
+     * Sets the given {@link WatermarkStrategy} on this consumer. These will be used to assign
+     * timestamps to records and generates watermarks to signal event time progress.
+     *
+     * <p>Running timestamp extractors / watermark generators directly inside the Kafka source
+     * (which you can do by using this method), per Kafka partition, allows users to let them
+     * exploit the per-partition characteristics.
+     *
+     * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+     * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+     * characteristics are usually lost that way. For example, if the timestamps are strictly
+     * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+     * DataStream, if the parallel source subtask reads more than one partition.
+     *
+     * <p>Common watermark generation patterns can be found as static methods in the {@link
+     * WatermarkStrategy} class.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+            WatermarkStrategy<T> watermarkStrategy) {
+        checkNotNull(watermarkStrategy);
+
+        try {
+            ClosureCleaner.clean(
+                    watermarkStrategy, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            this.watermarkStrategy = new SerializedValue<>(watermarkStrategy);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    "The given WatermarkStrategy is not serializable", e);
+        }
+
+        return this;
+    }
+
+    /**
+     * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated
+     * manner. The watermark extractor will run per Kafka partition, watermarks will be merged
+     * across partitions in the same way as in the Flink runtime, when streams are merged.
+     *
+     * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+     * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+     * characteristics are usually lost that way. For example, if the timestamps are strictly
+     * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+     * DataStream, if the parallel source subtask reads more than one partition.
+     *
+     * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per
+     * Kafka partition, allows users to let them exploit the per-partition characteristics.
+     *
+     * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link
+     * AssignerWithPeriodicWatermarks}, not both at the same time.
+     *
+     * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
+     * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
+     * interfaces support watermark idleness and no longer need to differentiate between "periodic"
+     * and "punctuated" watermarks.
+     *
+     * @param assigner The timestamp assigner / watermark generator to use.
+     * @return The consumer object, to allow function chaining.
+     * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
+     */
+    @Deprecated
+    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+            AssignerWithPunctuatedWatermarks<T> assigner) {
+        checkNotNull(assigner);
+
+        if (this.watermarkStrategy != null) {
+            throw new IllegalStateException("Some watermark strategy has already been set.");
+        }
+
+        try {
+            ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            final WatermarkStrategy<T> wms =
+                    new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(assigner);
+
+            return assignTimestampsAndWatermarks(wms);
+        } catch (Exception e) {
+            throw new IllegalArgumentException("The given assigner is not serializable", e);
+        }
+    }
+
+    /**
+     * Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated
+     * manner. The watermark extractor will run per Kafka partition, watermarks will be merged
+     * across partitions in the same way as in the Flink runtime, when streams are merged.
+     *
+     * <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions, the streams
+     * from the partitions are unioned in a "first come first serve" fashion. Per-partition
+     * characteristics are usually lost that way. For example, if the timestamps are strictly
+     * ascending per Kafka partition, they will not be strictly ascending in the resulting Flink
+     * DataStream, if the parallel source subtask reads more that one partition.
+     *
+     * <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per
+     * Kafka partition, allows users to let them exploit the per-partition characteristics.
+     *
+     * <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an {@link
+     * AssignerWithPeriodicWatermarks}, not both at the same time.
+     *
+     * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link
+     * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new
+     * interfaces support watermark idleness and no longer need to differentiate between "periodic"
+     * and "punctuated" watermarks.
+     *
+     * @param assigner The timestamp assigner / watermark generator to use.
+     * @return The consumer object, to allow function chaining.
+     * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
+     */
+    @Deprecated
+    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(
+            AssignerWithPeriodicWatermarks<T> assigner) {
+        checkNotNull(assigner);
+
+        if (this.watermarkStrategy != null) {
+            throw new IllegalStateException("Some watermark strategy has already been set.");
+        }
+
+        try {
+            ClosureCleaner.clean(assigner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+            final WatermarkStrategy<T> wms =
+                    new AssignerWithPeriodicWatermarksAdapter.Strategy<>(assigner);
+
+            return assignTimestampsAndWatermarks(wms);
+        } catch (Exception e) {
+            throw new IllegalArgumentException("The given assigner is not serializable", e);
+        }
+    }
+
+    /**
+     * Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
+     *
+     * <p>This setting will only have effect if checkpointing is enabled for the job. If
+     * checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit"
+     * (for 0.9+) property settings will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
+        this.enableCommitOnCheckpoints = commitOnCheckpoints;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading from the earliest offset for all partitions. This
+     * lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
+        this.startupMode = StartupMode.EARLIEST;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading from the latest offset for all partitions. This lets
+     * the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromLatest() {
+        this.startupMode = StartupMode.LATEST;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading partitions from a specified timestamp. The specified
+     * timestamp must be before the current timestamp. This lets the consumer ignore any committed
+     * group offsets in Zookeeper / Kafka brokers.
+     *
+     * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal to
+     * the specific timestamp from Kafka. If there's no such offset, the consumer will use the
+     * latest offset to read data from kafka.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @param startupOffsetsTimestamp timestamp for the startup offsets, as milliseconds from epoch.
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
+        checkArgument(
+                startupOffsetsTimestamp >= 0,
+                "The provided value for the startup offsets timestamp is invalid.");
+
+        long currentTimestamp = System.currentTimeMillis();
+        checkArgument(
+                startupOffsetsTimestamp <= currentTimestamp,
+                "Startup time[%s] must be before current time[%s].",
+                startupOffsetsTimestamp,
+                currentTimestamp);
+
+        this.startupMode = StartupMode.TIMESTAMP;
+        this.startupOffsetsTimestamp = startupOffsetsTimestamp;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading from any committed group offsets found in Zookeeper /
+     * Kafka brokers. The "group.id" property must be set in the configuration properties. If no
+     * offset can be found for a partition, the behaviour in "auto.offset.reset" set in the
+     * configuration properties will be used for the partition.
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
+        this.startupMode = StartupMode.GROUP_OFFSETS;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = null;
+        return this;
+    }
+
+    /**
+     * Specifies the consumer to start reading partitions from specific offsets, set independently
+     * for each partition. The specified offset should be the offset of the next record that will be
+     * read from partitions. This lets the consumer ignore any committed group offsets in Zookeeper
+     * / Kafka brokers.
+     *
+     * <p>If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not
+     * subscribed by the consumer, the entry will be ignored. If the consumer subscribes to a
+     * partition that does not exist in the provided map of offsets, the consumer will fallback to
+     * the default group offset behaviour (see {@link
+     * FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
+     *
+     * <p>If the specified offset for a partition is invalid, or the behaviour for that partition is
+     * defaulted to group offsets but still no group offset could be found for it, then the
+     * "auto.offset.reset" behaviour set in the configuration properties will be used for the
+     * partition
+     *
+     * <p>This method does not affect where partitions are read from when the consumer is restored
+     * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or savepoint,
+     * only the offsets in the restored state will be used.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(
+            Map<KafkaTopicPartition, Long> specificStartupOffsets) {
+        this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+        this.startupOffsetsTimestamp = null;
+        this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
+        return this;
+    }
+
+    /**
+     * By default, when restoring from a checkpoint / savepoint, the consumer always ignores
+     * restored partitions that are no longer associated with the current specified topics or topic
+     * pattern to subscribe to.
+     *
+     * <p>This method configures the consumer to not filter the restored partitions, therefore
+     * always attempting to consume whatever partition was present in the previous execution
+     * regardless of the specified topics to subscribe to in the current execution.
+     *
+     * @return The consumer object, to allow function chaining.
+     */
+    public FlinkKafkaConsumerBase<T> disableFilterRestoredPartitionsWithSubscribedTopics() {
+        this.filterRestoredPartitionsWithCurrentTopicsDescriptor = false;
+        return this;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Work methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        // determine the offset commit mode
+        this.offsetCommitMode =
+                OffsetCommitModes.fromConfiguration(
+                        getIsAutoCommitEnabled(),
+                        enableCommitOnCheckpoints,
+                        ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
+
+        // create the partition discoverer
+        this.partitionDiscoverer =
+                createPartitionDiscoverer(
+                        topicsDescriptor,
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        getRuntimeContext().getNumberOfParallelSubtasks());
+        this.partitionDiscoverer.open();
+
+        subscribedPartitionsToStartOffsets = new HashMap<>();
+        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
+        if (restoredState != null) {
+            for (KafkaTopicPartition partition : allPartitions) {
+                if (!restoredState.containsKey(partition)) {
+                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
+                }
+            }
+
+            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry :
+                    restoredState.entrySet()) {
+                // seed the partition discoverer with the union state while filtering out
+                // restored partitions that should not be subscribed by this subtask
+                if (KafkaTopicPartitionAssigner.assign(
+                        restoredStateEntry.getKey(),
+                        getRuntimeContext().getNumberOfParallelSubtasks())
+                        == getRuntimeContext().getIndexOfThisSubtask()) {
+                    subscribedPartitionsToStartOffsets.put(
+                            restoredStateEntry.getKey(), restoredStateEntry.getValue());
+                }
+            }
+
+            if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
+                subscribedPartitionsToStartOffsets
+                        .entrySet()
+                        .removeIf(
+                                entry -> {
+                                    if (!topicsDescriptor.isMatchingTopic(
+                                            entry.getKey().getTopic())) {
+                                        LOG.warn(
+                                                "{} is removed from subscribed partitions since it is no longer "
+                                                        + "associated with topics descriptor of current execution.",
+                                                entry.getKey());
+                                        return true;
+                                    }
+                                    return false;
+                                });
+            }
+
+            LOG.info(
+                    "Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
+                    getRuntimeContext().getIndexOfThisSubtask(),
+                    subscribedPartitionsToStartOffsets.size(),
+                    subscribedPartitionsToStartOffsets);
+        } else {
+            // use the partition discoverer to fetch the initial seed partitions,
+            // and set their initial offsets depending on the startup mode.
+            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
+            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily
+            // determined
+            // when the partition is actually read.
+            switch (startupMode) {
+                case SPECIFIC_OFFSETS:
+                    if (specificStartupOffsets == null) {
+                        throw new IllegalStateException(
+                                "Startup mode for the consumer set to "
+                                        + StartupMode.SPECIFIC_OFFSETS
+                                        + ", but no specific offsets were specified.");
+                    }
+
+                    for (KafkaTopicPartition seedPartition : allPartitions) {
+                        Long specificOffset = specificStartupOffsets.get(seedPartition);
+                        if (specificOffset != null) {
+                            // since the specified offsets represent the next record to read, we
+                            // subtract
+                            // it by one so that the initial state of the consumer will be correct
+                            subscribedPartitionsToStartOffsets.put(
+                                    seedPartition, specificOffset - 1);
+                        } else {
+                            // default to group offset behaviour if the user-provided specific
+                            // offsets
+                            // do not contain a value for this partition
+                            subscribedPartitionsToStartOffsets.put(
+                                    seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+                        }
+                    }
+
+                    break;
+                case TIMESTAMP:
+                    if (startupOffsetsTimestamp == null) {
+                        throw new IllegalStateException(
+                                "Startup mode for the consumer set to "
+                                        + StartupMode.TIMESTAMP
+                                        + ", but no startup timestamp was specified.");
+                    }
+
+                    for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset :
+                            fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp)
+                                    .entrySet()) {
+                        subscribedPartitionsToStartOffsets.put(
+                                partitionToOffset.getKey(),
+                                (partitionToOffset.getValue() == null)
+                                        // if an offset cannot be retrieved for a partition with the
+                                        // given timestamp,
+                                        // we default to using the latest offset for the partition
+                                        ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
+                                        // since the specified offsets represent the next record to
+                                        // read, we subtract
+                                        // it by one so that the initial state of the consumer will
+                                        // be correct
+                                        : partitionToOffset.getValue() - 1);
+                    }
+
+                    break;
+                default:
+                    for (KafkaTopicPartition seedPartition : allPartitions) {
+                        subscribedPartitionsToStartOffsets.put(
+                                seedPartition, startupMode.getStateSentinel());
+                    }
+            }
+
+            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
+                switch (startupMode) {
+                    case EARLIEST:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the earliest"
+                                        + " offsets: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                subscribedPartitionsToStartOffsets.keySet());
+                        break;
+                    case LATEST:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the latest "
+                                        + "offsets: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                subscribedPartitionsToStartOffsets.keySet());
+                        break;
+                    case TIMESTAMP:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from timestamp "
+                                        + "{}: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                startupOffsetsTimestamp,
+                                subscribedPartitionsToStartOffsets.keySet());
+                        break;
+                    case SPECIFIC_OFFSETS:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the "
+                                        + "specified startup offsets {}: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                specificStartupOffsets,
+                                subscribedPartitionsToStartOffsets.keySet());
+
+                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets =
+                                new ArrayList<>(subscribedPartitionsToStartOffsets.size());
+                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
+                                subscribedPartitionsToStartOffsets.entrySet()) {
+                            if (subscribedPartition.getValue()
+                                    == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
+                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
+                            }
+                        }
+
+                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
+                            LOG.warn(
+                                    "Consumer subtask {} cannot find offsets for the following {} partitions in the "
+                                            + "specified startup offsets: {}"
+                                            + "; their startup offsets will be defaulted to their committed group "
+                                            + "offsets in Kafka.",
+                                    getRuntimeContext().getIndexOfThisSubtask(),
+                                    partitionsDefaultedToGroupOffsets.size(),
+                                    partitionsDefaultedToGroupOffsets);
+                        }
+                        break;
+                    case GROUP_OFFSETS:
+                        LOG.info(
+                                "Consumer subtask {} will start reading the following {} partitions from the "
+                                        + "committed group offsets in Kafka: {}",
+                                getRuntimeContext().getIndexOfThisSubtask(),
+                                subscribedPartitionsToStartOffsets.size(),
+                                subscribedPartitionsToStartOffsets.keySet());
+                }
+            } else {
+                LOG.info(
+                        "Consumer subtask {} initially has no partitions to read from.",
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+
+        this.deserializer.open(
+                RuntimeContextInitializationContextAdapters.deserializationAdapter(
+                        getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
+    }
+
+    @Override
+    public void run(SourceContext<T> sourceContext) throws Exception {
+
+        if (StringUtils.isNotEmpty(this.inlongMetric)) {
+            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            AuditImp auditImp = null;
+            if (inlongAudit != null) {
+                AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+                auditImp = AuditImp.getInstance();
+            }
+            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, getRuntimeContext().getMetricGroup(),
+                    auditImp);
+            ThreadSafeCounter recordsInCounter = new ThreadSafeCounter();
+            ThreadSafeCounter bytesInCounter = new ThreadSafeCounter();
+            if (metricState != null) {
+                recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN));
+                bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN));
+            }
+            sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter);
+            sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter);
+            sourceMetricData.registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter());
+            sourceMetricData.registerMetricsForNumBytesInForMeter(new ThreadSafeCounter());
+            sourceMetricData.registerMetricsForNumBytesInPerSecond();
+            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+            if (this.deserializer instanceof DynamicKafkaDeserializationSchema) {
+                DynamicKafkaDeserializationSchema dynamicKafkaDeserializationSchema =
+                        (DynamicKafkaDeserializationSchema) deserializer;
+                dynamicKafkaDeserializationSchema.setMetricData(sourceMetricData);
+            }
+        }
+
+        if (subscribedPartitionsToStartOffsets == null) {
+            throw new Exception("The partitions were not set for the consumer");
+        }
+
+        // initialize commit metrics and default offset callback method
+        this.successfulCommits =
+                this.getRuntimeContext()
+                        .getMetricGroup()
+                        .counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
+        this.failedCommits =
+                this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
+        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
+
+        this.offsetCommitCallback =
+                new KafkaCommitCallback() {
+                    @Override
+                    public void onSuccess() {
+                        successfulCommits.inc();
+                    }
+
+                    @Override
+                    public void onException(Throwable cause) {
+                        LOG.warn(
+                                String.format(
+                                        "Consumer subtask %d failed async Kafka commit.",
+                                        subtaskIndex),
+                                cause);
+                        failedCommits.inc();
+                    }
+                };
+
+        // mark the subtask as temporarily idle if there are no initial seed partitions;
+        // once this subtask discovers some partitions and starts collecting records, the subtask's
+        // status will automatically be triggered back to be active.
+        if (subscribedPartitionsToStartOffsets.isEmpty()) {
+            sourceContext.markAsTemporarilyIdle();
+        }
+
+        LOG.info(
+                "Consumer subtask {} creating fetcher with offsets {}.",
+                getRuntimeContext().getIndexOfThisSubtask(),
+                subscribedPartitionsToStartOffsets);
+        // from this point forward:
+        //   - 'snapshotState' will draw offsets from the fetcher,
+        //     instead of being built from `subscribedPartitionsToStartOffsets`
+        //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
+        //     Kafka through the fetcher, if configured to do so)
+        this.kafkaFetcher =
+                createFetcher(
+                        sourceContext,
+                        subscribedPartitionsToStartOffsets,
+                        watermarkStrategy,
+                        (StreamingRuntimeContext) getRuntimeContext(),
+                        offsetCommitMode,
+                        getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
+                        useMetrics);
+
+        if (!running) {
+            return;
+        }
+
+        // depending on whether we were restored with the current state version (1.3),
+        // remaining logic branches off into 2 paths:
+        //  1) New state - partition discovery loop executed as separate thread, with this
+        //                 thread running the main fetcher loop
+        //  2) Old state - partition discovery is disabled and only the main fetcher loop is
+        // executed
+        if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
+            kafkaFetcher.runFetchLoop();
+        } else {
+            runWithPartitionDiscovery();
+        }
+    }
+
+    private void runWithPartitionDiscovery() throws Exception {
+        final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
+        createAndStartDiscoveryLoop(discoveryLoopErrorRef);
+
+        kafkaFetcher.runFetchLoop();
+
+        // make sure that the partition discoverer is waked up so that
+        // the discoveryLoopThread exits
+        partitionDiscoverer.wakeup();
+        joinDiscoveryLoopThread();
+
+        // rethrow any fetcher errors
+        final Exception discoveryLoopError = discoveryLoopErrorRef.get();
+        if (discoveryLoopError != null) {
+            throw new RuntimeException(discoveryLoopError);
+        }
+    }
+
+    @VisibleForTesting
+    void joinDiscoveryLoopThread() throws InterruptedException {
+        if (discoveryLoopThread != null) {
+            discoveryLoopThread.join();
+        }
+    }
+
+    private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {
+        discoveryLoopThread =
+                new Thread(
+                        () -> {
+                            try {
+                                // --------------------- partition discovery loop
+                                // ---------------------
+
+                                // throughout the loop, we always eagerly check if we are still
+                                // running before
+                                // performing the next operation, so that we can escape the loop as
+                                // soon as possible
+
+                                while (running) {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug(
+                                                "Consumer subtask {} is trying to discover new partitions ...",
+                                                getRuntimeContext().getIndexOfThisSubtask());
+                                    }
+
+                                    final List<KafkaTopicPartition> discoveredPartitions;
+                                    try {
+                                        discoveredPartitions =
+                                                partitionDiscoverer.discoverPartitions();
+                                    } catch (AbstractPartitionDiscoverer.WakeupException
+                                            | AbstractPartitionDiscoverer.ClosedException e) {
+                                        // the partition discoverer may have been closed or woken up
+                                        // before or during the discovery;
+                                        // this would only happen if the consumer was canceled;
+                                        // simply escape the loop
+                                        break;
+                                    }
+
+                                    // no need to add the discovered partitions if we were closed
+                                    // during the meantime
+                                    if (running && !discoveredPartitions.isEmpty()) {
+                                        kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
+                                    }
+
+                                    // do not waste any time sleeping if we're not running anymore
+                                    if (running && discoveryIntervalMillis != 0) {
+                                        try {
+                                            Thread.sleep(discoveryIntervalMillis);
+                                        } catch (InterruptedException iex) {
+                                            // may be interrupted if the consumer was canceled
+                                            // midway; simply escape the loop
+                                            break;
+                                        }
+                                    }
+                                }
+                            } catch (Exception e) {
+                                discoveryLoopErrorRef.set(e);
+                            } finally {
+                                // calling cancel will also let the fetcher loop escape
+                                // (if not running, cancel() was already called)
+                                if (running) {
+                                    cancel();
+                                }
+                            }
+                        },
+                        "Kafka Partition Discovery for "
+                                + getRuntimeContext().getTaskNameWithSubtasks());
+
+        discoveryLoopThread.start();
+    }
+
+    @Override
+    public void cancel() {
+        // set ourselves as not running;
+        // this would let the main discovery loop escape as soon as possible
+        running = false;
+
+        if (discoveryLoopThread != null) {
+
+            if (partitionDiscoverer != null) {
+                // we cannot close the discoverer here, as it is error-prone to concurrent access;
+                // only wakeup the discoverer, the discovery loop will clean itself up after it
+                // escapes
+                partitionDiscoverer.wakeup();
+            }
+
+            // the discovery loop may currently be sleeping in-between
+            // consecutive discoveries; interrupt to shutdown faster
+            discoveryLoopThread.interrupt();
+        }
+
+        // abort the fetcher, if there is one
+        if (kafkaFetcher != null) {
+            kafkaFetcher.cancel();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        cancel();
+
+        joinDiscoveryLoopThread();
+
+        Exception exception = null;
+        if (partitionDiscoverer != null) {
+            try {
+                partitionDiscoverer.close();
+            } catch (Exception e) {
+                exception = e;
+            }
+        }
+
+        try {
+            super.close();
+        } catch (Exception e) {
+            exception = ExceptionUtils.firstOrSuppressed(e, exception);
+        }
+
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint and restore
+    // ------------------------------------------------------------------------
+
+    @Override
+    public final void initializeState(FunctionInitializationContext context) throws Exception {
+
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+
+        this.unionOffsetStates =
+                stateStore.getUnionListState(
+                        new ListStateDescriptor<>(
+                                OFFSETS_STATE_NAME,
+                                createStateSerializer(getRuntimeContext().getExecutionConfig())));
+
+        if (this.inlongMetric != null) {
+            this.metricStateListState =
+                    stateStore.getUnionListState(
+                            new ListStateDescriptor<>(
+                                    INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+
+        if (context.isRestored()) {
+            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
+            // populate actual holder for restored state
+            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
+                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
+            }
+
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+
+            LOG.info(
+                    "Consumer subtask {} restored state: {}.",
+                    getRuntimeContext().getIndexOfThisSubtask(),
+                    restoredState);
+        } else {
+            LOG.info(
+                    "Consumer subtask {} has no restore state.",
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    @Override
+    public final void snapshotState(FunctionSnapshotContext context) throws Exception {
+        if (!running) {
+            LOG.debug("snapshotState() called on closed source");
+        } else {
+            unionOffsetStates.clear();
+
+            final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+            if (fetcher == null) {
+                // the fetcher has not yet been initialized, which means we need to return the
+                // originally restored offsets or the assigned partitions
+                for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :
+                        subscribedPartitionsToStartOffsets.entrySet()) {
+                    unionOffsetStates.add(
+                            Tuple2.of(
+                                    subscribedPartition.getKey(), subscribedPartition.getValue()));
+                }
+
+                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+                    // the map cannot be asynchronously updated, because only one checkpoint call
+                    // can happen
+                    // on this function at a time: either snapshotState() or
+                    // notifyCheckpointComplete()
+                    pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
+                }
+            } else {
+                HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
+
+                if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+                    // the map cannot be asynchronously updated, because only one checkpoint call
+                    // can happen
+                    // on this function at a time: either snapshotState() or
+                    // notifyCheckpointComplete()
+                    pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
+                }
+
+                for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :
+                        currentOffsets.entrySet()) {
+                    unionOffsetStates.add(
+                            Tuple2.of(
+                                    kafkaTopicPartitionLongEntry.getKey(),
+                                    kafkaTopicPartitionLongEntry.getValue()));
+                }
+            }
+
+            if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+                // truncate the map of pending offsets to commit, to prevent infinite growth
+                while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
+                    pendingOffsetsToCommit.remove(0);
+                }
+            }
+            if (sourceMetricData != null && metricStateListState != null) {
+                MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData,
+                        getRuntimeContext().getIndexOfThisSubtask());
+            }
+        }
+    }
+
+    @Override
+    public final void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (!running) {
+            LOG.debug("notifyCheckpointComplete() called on closed source");
+            return;
+        }
+
+        final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
+        if (fetcher == null) {
+            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
+            return;
+        }
+
+        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
+            // only one commit operation must be in progress
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
+                        getRuntimeContext().getIndexOfThisSubtask(),
+                        checkpointId);
+            }
+
+            try {
+                final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
+                if (posInMap == -1) {
+                    LOG.warn(
+                            "Consumer subtask {} received confirmation for unknown checkpoint id {}",
+                            getRuntimeContext().getIndexOfThisSubtask(),
+                            checkpointId);
+                    return;
+                }
+
+                @SuppressWarnings("unchecked")
+                Map<KafkaTopicPartition, Long> offsets =
+                        (Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
+
+                // remove older checkpoints in map
+                for (int i = 0; i < posInMap; i++) {
+                    pendingOffsetsToCommit.remove(0);
+                }
+
+                if (offsets == null || offsets.size() == 0) {
+                    LOG.debug(
+                            "Consumer subtask {} has empty checkpoint state.",
+                            getRuntimeContext().getIndexOfThisSubtask());
+                    return;
+                }
+
+                fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
+            } catch (Exception e) {
+                if (running) {
+                    throw e;
+                }
+                // else ignore exception if we are no longer running
+            }
+        }
+    }
+
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) {
+    }
+
+    // ------------------------------------------------------------------------
+    //  Kafka Consumer specific methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the data, and
+     * emits it into the data streams.
+     *
+     * @param sourceContext The source context to emit data to.
+     * @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should
+     *         handle, with their start offsets.
+     * @param watermarkStrategy Optional, a serialized WatermarkStrategy.
+     * @param runtimeContext The task's runtime context.
+     * @return The instantiated fetcher
+     * @throws Exception The method should forward exceptions
+     */
+    protected abstract AbstractFetcher<T, ?> createFetcher(
+            SourceContext<T> sourceContext,
+            Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
+            SerializedValue<WatermarkStrategy<T>> watermarkStrategy,
+            StreamingRuntimeContext runtimeContext,
+            OffsetCommitMode offsetCommitMode,
+            MetricGroup kafkaMetricGroup,
+            boolean useMetrics)
+            throws Exception;
+
+    /**
+     * Creates the partition discoverer that is used to find new partitions for this subtask.
+     *
+     * @param topicsDescriptor Descriptor that describes whether we are discovering partitions for
+     *         fixed topics or a topic pattern.
+     * @param indexOfThisSubtask The index of this consumer subtask.
+     * @param numParallelSubtasks The total number of parallel consumer subtasks.
+     * @return The instantiated partition discoverer
+     */
+    protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
+            KafkaTopicsDescriptor topicsDescriptor,
+            int indexOfThisSubtask,
+            int numParallelSubtasks);
+
+    protected abstract boolean getIsAutoCommitEnabled();
+
+    protected abstract Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
+            Collection<KafkaTopicPartition> partitions, long timestamp);
+
+    // ------------------------------------------------------------------------
+    //  ResultTypeQueryable methods
+    // ------------------------------------------------------------------------
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializer.getProducedType();
+    }
+
+    // ------------------------------------------------------------------------
+    //  Test utilities
+    // ------------------------------------------------------------------------
+
+    @VisibleForTesting
+    Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
+        return subscribedPartitionsToStartOffsets;
+    }
+
+    @VisibleForTesting
+    TreeMap<KafkaTopicPartition, Long> getRestoredState() {
+        return restoredState;
+    }
+
+    @VisibleForTesting
+    OffsetCommitMode getOffsetCommitMode() {
+        return offsetCommitMode;
+    }
+
+    @VisibleForTesting
+    LinkedMap getPendingOffsetsToCommit() {
+        return pendingOffsetsToCommit;
+    }
+
+    @VisibleForTesting
+    public boolean getEnableCommitOnCheckpoints() {
+        return enableCommitOnCheckpoints;
+    }
+
+    /**
+     * Creates state serializer for kafka topic partition to offset tuple. Using of the explicit
+     * state serializer with KryoSerializer is needed because otherwise users cannot use
+     * 'disableGenericTypes' properties with KafkaConsumer.
+     */
+    @VisibleForTesting
+    static TupleSerializer<Tuple2<KafkaTopicPartition, Long>> createStateSerializer(
+            ExecutionConfig executionConfig) {
+        // explicit serializer will keep the compatibility with GenericTypeInformation and allow to
+        // disableGenericTypes for users
+        TypeSerializer<?>[] fieldSerializers =
+                new TypeSerializer<?>[]{
+                        new KryoSerializer<>(KafkaTopicPartition.class, executionConfig),
+                        LongSerializer.INSTANCE
+                };
+        @SuppressWarnings("unchecked")
+        Class<Tuple2<KafkaTopicPartition, Long>> tupleClass =
+                (Class<Tuple2<KafkaTopicPartition, Long>>) (Class<?>) Tuple2.class;
+        return new TupleSerializer<>(tupleClass, fieldSerializers);
+    }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index b2efd2c3e..3f0902c0c 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
@@ -57,9 +58,10 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.metric.ThreadSafeCounter;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -78,7 +80,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -93,9 +94,13 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Copy from org.apache.flink:flink-connector-kafka_2.11:1.13.5
@@ -256,6 +261,10 @@ public class FlinkKafkaProducer<IN>
     private SinkMetricData metricData;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
+
+    private transient ListState<MetricState> metricStateListState;
+
+    private MetricState metricState;
     /**
      * State for nextTransactionalIdHint.
      */
@@ -910,27 +919,27 @@ public class FlinkKafkaProducer<IN>
             inlongGroupId = inlongMetricArray[0];
             inlongStreamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup());
+            metricData = new SinkMetricData(inlongGroupId, inlongStreamId, nodeId, ctx.getMetricGroup(),
+                    auditHostAndPorts);
             metricData.registerMetricsForDirtyBytes(new ThreadSafeCounter());
             metricData.registerMetricsForDirtyRecords(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOut(new ThreadSafeCounter());
             metricData.registerMetricsForNumRecordsOut(new ThreadSafeCounter());
+            metricData.registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter());
+            metricData.registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter());
             metricData.registerMetricsForNumBytesOutPerSecond();
             metricData.registerMetricsForNumRecordsOutPerSecond();
         }
-
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
+        if (metricState != null && metricData != null) {
+            metricData.getNumBytesOut().inc(metricState.getMetricValue(NUM_BYTES_OUT));
+            metricData.getNumRecordsOut().inc(metricState.getMetricValue(NUM_RECORDS_OUT));
         }
-
         super.open(configuration);
     }
 
     private void sendOutMetrics(Long rowSize, Long dataSize) {
         if (metricData != null) {
-            metricData.getNumRecordsOut().inc(rowSize);
-            metricData.getNumBytesOut().inc(dataSize);
+            metricData.invoke(rowSize, dataSize);
         }
     }
 
@@ -941,23 +950,6 @@ public class FlinkKafkaProducer<IN>
         }
     }
 
-    private void outputMetricForAudit(ProducerRecord<byte[], byte[]> record) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    inlongGroupId,
-                    inlongStreamId,
-                    System.currentTimeMillis(),
-                    1,
-                    record.value().length);
-        }
-    }
-
-    private void resetMetricSize() {
-        dataSize = 0L;
-        rowSize = 0L;
-    }
-
     // ------------------- Logic for handling checkpoint flushing -------------------------- //
 
     @Override
@@ -965,7 +957,6 @@ public class FlinkKafkaProducer<IN>
             FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)
             throws FlinkKafkaException {
         checkErroneous();
-        resetMetricSize();
 
         ProducerRecord<byte[], byte[]> record;
         if (keyedSchema != null) {
@@ -1029,10 +1020,7 @@ public class FlinkKafkaProducer<IN>
                             + "is a bug.");
         }
 
-        rowSize++;
-        dataSize = dataSize + record.value().length;
-        sendOutMetrics(rowSize, dataSize);
-        outputMetricForAudit(record);
+        sendOutMetrics(1L, (long) record.value().length);
 
         pendingRecords.incrementAndGet();
         transaction.producer.send(record, callback);
@@ -1247,6 +1235,10 @@ public class FlinkKafkaProducer<IN>
                             getRuntimeContext().getNumberOfParallelSubtasks(),
                             nextFreeTransactionalId));
         }
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
     }
 
     @Override
@@ -1260,6 +1252,14 @@ public class FlinkKafkaProducer<IN>
             semantic = FlinkKafkaProducer.Semantic.NONE;
         }
 
+        if (this.inlongMetric != null) {
+            this.metricStateListState =
+                    context.getOperatorStateStore().getUnionListState(
+                            new ListStateDescriptor<>(
+                                    INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                            })));
+        }
+
         nextTransactionalIdHintState =
                 context.getOperatorStateStore()
                         .getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
@@ -1313,6 +1313,11 @@ public class FlinkKafkaProducer<IN>
             }
         }
 
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+
         super.initializeState(context);
     }
 
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index 17e92abda..c6b5c11a9 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -28,23 +28,18 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.audit.AuditImp;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
-import static org.apache.inlong.sort.base.Constants.DELIMITER;
 
 /**
  * deserialization schema for {@link KafkaDynamicSource}.
  */
-class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
 
     private static final long serialVersionUID = 1L;
 
@@ -63,18 +58,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
 
     private final boolean upsertMode;
 
-    private final String inlongMetric;
-
     private SourceMetricData metricData;
 
-    private String inlongGroupId;
-
-    private String auditHostAndPorts;
-
-    private String inlongStreamId;
-
-    private transient AuditImp auditImp;
-
     DynamicKafkaDeserializationSchema(
             int physicalArity,
             @Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -84,9 +69,7 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
             boolean hasMetadata,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
-            boolean upsertMode,
-            String inlongMetric,
-            String auditHostAndPorts) {
+            boolean upsertMode) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -105,9 +88,10 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
                         upsertMode);
         this.producedTypeInfo = producedTypeInfo;
         this.upsertMode = upsertMode;
-        this.inlongMetric = inlongMetric;
-        this.auditHostAndPorts = auditHostAndPorts;
+    }
 
+    public void setMetricData(SourceMetricData metricData) {
+        this.metricData = metricData;
     }
 
     @Override
@@ -116,21 +100,6 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
             keyDeserialization.open(context);
         }
         valueDeserialization.open(context);
-        if (inlongMetric != null && !inlongMetric.isEmpty()) {
-            String[] inlongMetricArray = inlongMetric.split(DELIMITER);
-            inlongGroupId = inlongMetricArray[0];
-            inlongStreamId = inlongMetricArray[1];
-            String nodeId = inlongMetricArray[2];
-            metricData = new SourceMetricData(inlongGroupId, inlongStreamId, nodeId, context.getMetricGroup());
-            metricData.registerMetricsForNumBytesIn();
-            metricData.registerMetricsForNumBytesInPerSecond();
-            metricData.registerMetricsForNumRecordsIn();
-            metricData.registerMetricsForNumRecordsInPerSecond();
-        }
-        if (auditHostAndPorts != null) {
-            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
-            auditImp = AuditImp.getInstance();
-        }
     }
 
     @Override
@@ -178,26 +147,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
     }
 
     private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
-        outputMetricForFlink(record);
-        outputMetricForAudit(record);
-    }
-
-    private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) {
-        if (auditImp != null) {
-            auditImp.add(
-                    Constants.AUDIT_SORT_INPUT,
-                    inlongGroupId,
-                    inlongStreamId,
-                    System.currentTimeMillis(),
-                    1,
-                    record.value().length);
-        }
-    }
-
-    private void outputMetricForFlink(ConsumerRecord<byte[], byte[]> record) {
         if (metricData != null) {
-            metricData.getNumBytesIn().inc(record.value().length);
-            metricData.getNumRecordsIn().inc(1);
+            metricData.outputMetrics(1, record.value().length);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index f3580a8f1..af784aad4 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -41,13 +40,12 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
-
+import org.apache.inlong.sort.kafka.FlinkKafkaConsumer;
 import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Header;
 
 import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -74,14 +72,21 @@ public class KafkaDynamicSource
     // Mutable attributes
     // --------------------------------------------------------------------------------------------
 
-    /** Data type that describes the final output of the source. */
+    /**
+     * Data type that describes the final output of the source.
+     */
     protected DataType producedDataType;
 
-    /** Metadata that is appended at the end of a physical source row. */
+    /**
+     * Metadata that is appended at the end of a physical source row.
+     */
     protected List<String> metadataKeys;
 
-    /** Watermark strategy that is used to generate per-partition watermark. */
-    protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+    /**
+     * Watermark strategy that is used to generate per-partition watermark.
+     */
+    protected @Nullable
+    WatermarkStrategy<RowData> watermarkStrategy;
 
     // --------------------------------------------------------------------------------------------
     // Format attributes
@@ -89,35 +94,55 @@ public class KafkaDynamicSource
 
     private static final String VALUE_METADATA_PREFIX = "value.";
 
-    /** Data type to configure the formats. */
+    /**
+     * Data type to configure the formats.
+     */
     protected final DataType physicalDataType;
 
-    /** Optional format for decoding keys from Kafka. */
-    protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
+    /**
+     * Optional format for decoding keys from Kafka.
+     */
+    protected final @Nullable
+    DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
 
-    /** Format for decoding values from Kafka. */
+    /**
+     * Format for decoding values from Kafka.
+     */
     protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
 
-    /** Indices that determine the key fields and the target position in the produced row. */
+    /**
+     * Indices that determine the key fields and the target position in the produced row.
+     */
     protected final int[] keyProjection;
 
-    /** Indices that determine the value fields and the target position in the produced row. */
+    /**
+     * Indices that determine the value fields and the target position in the produced row.
+     */
     protected final int[] valueProjection;
 
-    /** Prefix that needs to be removed from fields when constructing the physical data type. */
-    protected final @Nullable String keyPrefix;
+    /**
+     * Prefix that needs to be removed from fields when constructing the physical data type.
+     */
+    protected final @Nullable
+    String keyPrefix;
 
     // --------------------------------------------------------------------------------------------
     // Kafka-specific attributes
     // --------------------------------------------------------------------------------------------
 
-    /** The Kafka topics to consume. */
+    /**
+     * The Kafka topics to consume.
+     */
     protected final List<String> topics;
 
-    /** The Kafka topic pattern to consume. */
+    /**
+     * The Kafka topic pattern to consume.
+     */
     protected final Pattern topicPattern;
 
-    /** Properties for the Kafka consumer. */
+    /**
+     * Properties for the Kafka consumer.
+     */
     protected final Properties properties;
 
     /**
@@ -137,7 +162,9 @@ public class KafkaDynamicSource
      */
     protected final long startupTimestampMillis;
 
-    /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
+    /**
+     * Flag to determine source mode. In upsert mode, it will keep the tombstone message. *
+     */
     protected final boolean upsertMode;
 
     protected final String inlongMetric;
@@ -214,7 +241,7 @@ public class KafkaDynamicSource
 
         final FlinkKafkaConsumer<RowData> kafkaConsumer =
                 createKafkaConsumer(keyDeserialization, valueDeserialization,
-                    producedTypeInfo, inlongMetric, auditHostAndPorts);
+                        producedTypeInfo, inlongMetric, auditHostAndPorts);
 
         return SourceFunctionProvider.of(kafkaConsumer, false);
     }
@@ -350,8 +377,8 @@ public class KafkaDynamicSource
             DeserializationSchema<RowData> keyDeserialization,
             DeserializationSchema<RowData> valueDeserialization,
             TypeInformation<RowData> producedTypeInfo,
-        String inlongMetric,
-        String auditHostAndPorts) {
+            String inlongMetric,
+            String auditHostAndPorts) {
 
         final MetadataConverter[] metadataConverters =
                 metadataKeys.stream()
@@ -390,13 +417,15 @@ public class KafkaDynamicSource
                         hasMetadata,
                         metadataConverters,
                         producedTypeInfo,
-                        upsertMode, inlongMetric, auditHostAndPorts);
+                        upsertMode);
 
         final FlinkKafkaConsumer<RowData> kafkaConsumer;
         if (topics != null) {
-            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
+            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties, inlongMetric,
+                    auditHostAndPorts);
         } else {
-            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
+            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties, inlongMetric,
+                    auditHostAndPorts);
         }
 
         switch (startupMode) {
@@ -425,7 +454,8 @@ public class KafkaDynamicSource
         return kafkaConsumer;
     }
 
-    private @Nullable DeserializationSchema<RowData> createDeserialization(
+    private @Nullable
+    DeserializationSchema<RowData> createDeserialization(
             DynamicTableSource.Context context,
             @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
             int[] projection,
diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index a7eebdbcd..e7084d2fa 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -429,6 +429,8 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
             sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp);
             sourceMetricData.registerMetricsForNumRecordsIn();
             sourceMetricData.registerMetricsForNumBytesIn();
+            sourceMetricData.registerMetricsForNumBytesInForMeter();
+            sourceMetricData.registerMetricsForNumRecordsInForMeter();
             sourceMetricData.registerMetricsForNumBytesInPerSecond();
             sourceMetricData.registerMetricsForNumRecordsInPerSecond();
         }
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 5e0bb25d3..1e8b13780 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -570,6 +570,18 @@
   Source  : flink-table-runtime-blink_2.11-13.2-rc2 2.2.1 (Please note that the software have been modified.)
   License : https://github.com/apache/flink/blob/release-1.13.2-rc2/LICENSE
 
+ 1.3.11 inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumer.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+        inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+  Source  : org.apache.flink:flink-connector-kafka_2.11:1.13.5 (Please note that the software have been modified.)
+  License : https://github.com/apache/flink/blob/master/LICENSE
+
 
 =======================================================================
 Apache InLong Subcomponents: