You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/09 09:41:16 UTC

[inlong] branch master updated: [INLONG-5158][Sort] Sort add metric for kafka source with flink metrics group and audit sdk (#5254)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 73cec9e61 [INLONG-5158][Sort] Sort add metric for kafka source with flink metrics group and audit sdk (#5254)
73cec9e61 is described below

commit 73cec9e61111a70fefe575d8cb56448f841fc6a1
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Tue Aug 9 17:41:11 2022 +0800

    [INLONG-5158][Sort] Sort add metric for kafka source with flink metrics group and audit sdk (#5254)
---
 .../org/apache/inlong/sort/protocol/GroupInfo.java |  14 +
 .../apache/inlong/sort/protocol/InlongMetric.java  |   5 +
 .../org/apache/inlong/sort/base/Constants.java     |  23 +
 inlong-sort/sort-connectors/kafka/pom.xml          |  12 +
 .../table/DynamicKafkaDeserializationSchema.java   | 346 +++++++++++++
 .../sort/kafka/table/KafkaDynamicSource.java       | 551 +++++++++++++++++++++
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  21 +-
 .../inlong/sort/parser/impl/FlinkSqlParser.java    |   4 +
 8 files changed, 972 insertions(+), 4 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
index 0ec0c728d..516bea0b4 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/GroupInfo.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.protocol;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
 import lombok.Data;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
@@ -39,6 +41,8 @@ public class GroupInfo implements Serializable {
     @JsonProperty("streams")
     private List<StreamInfo> streams;
 
+    private Map<String, String> properties;
+
     /**
      * Information of group.
      * 
@@ -50,6 +54,16 @@ public class GroupInfo implements Serializable {
             @JsonProperty("streams") List<StreamInfo> streams) {
         this.groupId = Preconditions.checkNotNull(groupId, "groupId is null");
         this.streams = Preconditions.checkNotNull(streams, "streams is null");
+        this.properties = new HashMap<>();
+        Preconditions.checkState(!streams.isEmpty(), "streams is empty");
+    }
+
+    public GroupInfo(@JsonProperty("groupId") String groupId,
+        @JsonProperty("streams") List<StreamInfo> streams,
+        Map<String, String> properties) {
+        this.groupId = Preconditions.checkNotNull(groupId, "groupId is null");
+        this.streams = Preconditions.checkNotNull(streams, "streams is null");
+        this.properties = properties;
         Preconditions.checkState(!streams.isEmpty(), "streams is empty");
     }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
index 5080f7523..4afb46a89 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/InlongMetric.java
@@ -39,4 +39,9 @@ public interface InlongMetric {
      */
     String METRIC_VALUE_FORMAT = "%s&%s&%s";
 
+    /**
+     * The key of InLong audit, the value should be ip:port&ip:port
+     */
+    String AUDIT_KEY = "inlong.audit";
+
 }
diff --git a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
index bda4e7475..e5a755aa6 100644
--- a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -18,6 +18,9 @@
 
 package org.apache.inlong.sort.base;
 
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
 /**
  * connector base option constant
  */
@@ -51,4 +54,24 @@ public final class Constants {
      */
     public static final String DELIMITER = "&";
 
+    // sort received successfully
+    public static final Integer AUDIT_SORT_INPUT = 7;
+
+    // sort send successfully
+    public static final Integer AUDIT_SORT_OUTPUT = 8;
+
+
+    public static final ConfigOption<String> INLONG_METRIC =
+        ConfigOptions.key("inlong.metric")
+            .stringType()
+            .defaultValue("")
+            .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+
+
+    public static final ConfigOption<String> INLONG_AUDIT =
+        ConfigOptions.key("inlong.audit")
+            .stringType()
+            .defaultValue("")
+            .withDescription("INLONG AUDIT HOST + '&' + PORT");
+
 }
diff --git a/inlong-sort/sort-connectors/kafka/pom.xml b/inlong-sort/sort-connectors/kafka/pom.xml
index dd3a71c7a..3b31a8c96 100644
--- a/inlong-sort/sort-connectors/kafka/pom.xml
+++ b/inlong-sort/sort-connectors/kafka/pom.xml
@@ -37,6 +37,17 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-kafka_${flink.scala.binary.version}</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>audit-sdk</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -54,6 +65,7 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
+                                    <include>org.apache.inlong:*</include>
                                     <include>org.apache.kafka:*</include>
                                     <include>org.apache.flink:flink-connector-kafka_${scala.binary.version}</include>
                                 </includes>
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
new file mode 100644
index 000000000..f09b16b5b
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/**
+ * deserialization schema for {@link KafkaDynamicSource}.
+ */
+class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final @Nullable DeserializationSchema<RowData> keyDeserialization;
+
+    private final DeserializationSchema<RowData> valueDeserialization;
+
+    private final boolean hasMetadata;
+
+    private final BufferingCollector keyCollector;
+
+    private final OutputProjectionCollector outputCollector;
+
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    private final boolean upsertMode;
+
+    private final String inlongMetric;
+
+    private SourceMetricData metricData;
+
+    private String inLongGroupId;
+
+    private String auditHostAndPorts;
+
+    private String inLongStreamId;
+
+    private transient AuditImp auditImp;
+
+    DynamicKafkaDeserializationSchema(
+            int physicalArity,
+            @Nullable DeserializationSchema<RowData> keyDeserialization,
+            int[] keyProjection,
+            DeserializationSchema<RowData> valueDeserialization,
+            int[] valueProjection,
+            boolean hasMetadata,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean upsertMode,
+            String inLongMetric,
+        String auditHostAndPorts) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keyDeserialization != null && keyProjection.length > 0,
+                    "Key must be set in upsert mode for deserialization schema.");
+        }
+        this.keyDeserialization = keyDeserialization;
+        this.valueDeserialization = valueDeserialization;
+        this.hasMetadata = hasMetadata;
+        this.keyCollector = new BufferingCollector();
+        this.outputCollector =
+                new OutputProjectionCollector(
+                        physicalArity,
+                        keyProjection,
+                        valueProjection,
+                        metadataConverters,
+                        upsertMode);
+        this.producedTypeInfo = producedTypeInfo;
+        this.upsertMode = upsertMode;
+        this.inlongMetric = inLongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+
+    }
+
+    @Override
+    public void open(DeserializationSchema.InitializationContext context) throws Exception {
+        if (keyDeserialization != null) {
+            keyDeserialization.open(context);
+        }
+        valueDeserialization.open(context);
+        if (inlongMetric != null && !inlongMetric.isEmpty()) {
+            String[] inLongMetricArray = inlongMetric.split(DELIMITER);
+            inLongGroupId = inLongMetricArray[0];
+            inLongStreamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            metricData = new SourceMetricData(context.getMetricGroup());
+            metricData.registerMetricsForNumBytesIn(inLongGroupId, inLongStreamId, nodeId, "numBytesIn");
+            metricData.registerMetricsForNumBytesInPerSecond(inLongGroupId, inLongStreamId,
+                nodeId, "numBytesInPerSecond");
+            metricData.registerMetricsForNumRecordsIn(inLongGroupId, inLongStreamId,
+                nodeId, "numRecordsIn");
+            metricData.registerMetricsForNumRecordsInPerSecond(inLongGroupId, inLongStreamId,
+                nodeId, "numRecordsInPerSecond");
+        }
+        if (auditHostAndPorts != null) {
+            AuditImp.getInstance().setAuditProxy(new HashSet<>(Arrays.asList(auditHostAndPorts.split(DELIMITER))));
+            auditImp = AuditImp.getInstance();
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData nextElement) {
+        return false;
+    }
+
+    @Override
+    public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
+        throw new IllegalStateException("A collector is required for deserializing.");
+    }
+
+    @Override
+    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector)
+            throws Exception {
+        // shortcut in case no output projection is required,
+        // also not for a cartesian product with the keys
+        if (keyDeserialization == null && !hasMetadata) {
+            valueDeserialization.deserialize(record.value(), collector);
+            // output metrics
+            outputMetrics(record);
+            return;
+        }
+
+        // buffer key(s)
+        if (keyDeserialization != null) {
+            keyDeserialization.deserialize(record.key(), keyCollector);
+        }
+
+        // project output while emitting values
+        outputCollector.inputRecord = record;
+        outputCollector.physicalKeyRows = keyCollector.buffer;
+        outputCollector.outputCollector = collector;
+
+        if (record.value() == null && upsertMode) {
+            // collect tombstone messages in upsert mode by hand
+            outputCollector.collect(null);
+        } else {
+            valueDeserialization.deserialize(record.value(), outputCollector);
+            // output metrics
+            outputMetrics(record);
+        }
+
+        keyCollector.buffer.clear();
+    }
+
+    private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
+        outputMetricForFlink(record);
+        outputMetricForAudit(record);
+    }
+
+    private void outputMetricForAudit(ConsumerRecord<byte[], byte[]> record) {
+        if (auditImp != null) {
+            auditImp.add(
+                Constants.AUDIT_SORT_INPUT,
+                inLongGroupId,
+                inLongStreamId,
+                System.currentTimeMillis(),
+                1,
+                record.value().length);
+        }
+    }
+
+    private void outputMetricForFlink(ConsumerRecord<byte[], byte[]> record) {
+        if (metricData != null) {
+            metricData.getNumBytesIn().inc(record.value().length);
+            metricData.getNumRecordsIn().inc(1);
+        }
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    interface MetadataConverter extends Serializable {
+        Object read(ConsumerRecord<?, ?> record);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final List<RowData> buffer = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            buffer.add(record);
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Emits a row with key, value, and metadata fields.
+     *
+     * <p>The collector is able to handle the following kinds of keys:
+     *
+     * <ul>
+     *   <li>No key is used.
+     *   <li>A key is used.
+     *   <li>The deserialization schema emits multiple keys.
+     *   <li>Keys and values have overlapping fields.
+     *   <li>Keys are used and value is null.
+     * </ul>
+     */
+    private static final class OutputProjectionCollector
+            implements Collector<RowData>, Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final int physicalArity;
+
+        private final int[] keyProjection;
+
+        private final int[] valueProjection;
+
+        private final MetadataConverter[] metadataConverters;
+
+        private final boolean upsertMode;
+
+        private transient ConsumerRecord<?, ?> inputRecord;
+
+        private transient List<RowData> physicalKeyRows;
+
+        private transient Collector<RowData> outputCollector;
+
+        OutputProjectionCollector(
+                int physicalArity,
+                int[] keyProjection,
+                int[] valueProjection,
+                MetadataConverter[] metadataConverters,
+                boolean upsertMode) {
+            this.physicalArity = physicalArity;
+            this.keyProjection = keyProjection;
+            this.valueProjection = valueProjection;
+            this.metadataConverters = metadataConverters;
+            this.upsertMode = upsertMode;
+        }
+
+        @Override
+        public void collect(RowData physicalValueRow) {
+            // no key defined
+            if (keyProjection.length == 0) {
+                emitRow(null, (GenericRowData) physicalValueRow);
+                return;
+            }
+
+            // otherwise emit a value for each key
+            for (RowData physicalKeyRow : physicalKeyRows) {
+                emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
+            }
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+
+        private void emitRow(
+                @Nullable GenericRowData physicalKeyRow,
+                @Nullable GenericRowData physicalValueRow) {
+            final RowKind rowKind;
+            if (physicalValueRow == null) {
+                if (upsertMode) {
+                    rowKind = RowKind.DELETE;
+                } else {
+                    throw new DeserializationException(
+                            "Invalid null value received "
+                                + "in non-upsert mode. Could not to "
+                                + "set row kind for output record.");
+                }
+            } else {
+                rowKind = physicalValueRow.getRowKind();
+            }
+
+            final int metadataArity = metadataConverters.length;
+            final GenericRowData producedRow =
+                    new GenericRowData(rowKind, physicalArity + metadataArity);
+
+            for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+                assert physicalKeyRow != null;
+                producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
+            }
+
+            if (physicalValueRow != null) {
+                for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
+                    producedRow.setField(
+                            valueProjection[valuePos], physicalValueRow.getField(valuePos));
+                }
+            }
+
+            for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+                producedRow.setField(
+                        physicalArity + metadataPos,
+                        metadataConverters[metadataPos].read(inputRecord));
+            }
+
+            outputCollector.collect(producedRow);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
new file mode 100644
index 000000000..d49af2d00
--- /dev/null
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -0,0 +1,551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * Dynamic Kafka table source.
+ * supports reading metadata from Kafka and metric reporting
+ */
+@Internal
+public class KafkaDynamicSource
+        implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    /** Watermark strategy that is used to generate per-partition watermark. */
+    protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+
+    // --------------------------------------------------------------------------------------------
+    // Format attributes
+    // --------------------------------------------------------------------------------------------
+
+    private static final String VALUE_METADATA_PREFIX = "value.";
+
+    /** Data type to configure the formats. */
+    protected final DataType physicalDataType;
+
+    /** Optional format for decoding keys from Kafka. */
+    protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
+
+    /** Format for decoding values from Kafka. */
+    protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
+
+    /** Indices that determine the key fields and the target position in the produced row. */
+    protected final int[] keyProjection;
+
+    /** Indices that determine the value fields and the target position in the produced row. */
+    protected final int[] valueProjection;
+
+    /** Prefix that needs to be removed from fields when constructing the physical data type. */
+    protected final @Nullable String keyPrefix;
+
+    // --------------------------------------------------------------------------------------------
+    // Kafka-specific attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** The Kafka topics to consume. */
+    protected final List<String> topics;
+
+    /** The Kafka topic pattern to consume. */
+    protected final Pattern topicPattern;
+
+    /** Properties for the Kafka consumer. */
+    protected final Properties properties;
+
+    /**
+     * The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}).
+     */
+    protected final StartupMode startupMode;
+
+    /**
+     * Specific startup offsets; only relevant when startup mode is {@link
+     * StartupMode#SPECIFIC_OFFSETS}.
+     */
+    protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+    /**
+     * The start timestamp to locate partition offsets; only relevant when startup mode is {@link
+     * StartupMode#TIMESTAMP}.
+     */
+    protected final long startupTimestampMillis;
+
+    /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */
+    protected final boolean upsertMode;
+
+    protected final String inLongMetric;
+
+    protected final String auditHostAndPorts;
+
+    public KafkaDynamicSource(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
+            Properties properties,
+            StartupMode startupMode,
+            Map<KafkaTopicPartition, Long> specificStartupOffsets,
+            long startupTimestampMillis,
+            boolean upsertMode,
+            final String inLongMetric,
+            final String auditHostAndPorts) {
+        // Format attributes
+        this.physicalDataType =
+                Preconditions.checkNotNull(
+                        physicalDataType, "Physical data type must not be null.");
+        this.keyDecodingFormat = keyDecodingFormat;
+        this.valueDecodingFormat =
+                Preconditions.checkNotNull(
+                        valueDecodingFormat, "Value decoding format must not be null.");
+        this.keyProjection =
+                Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
+        this.valueProjection =
+                Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
+        this.keyPrefix = keyPrefix;
+        // Mutable attributes
+        this.producedDataType = physicalDataType;
+        this.metadataKeys = Collections.emptyList();
+        this.watermarkStrategy = null;
+        // Kafka-specific attributes
+        Preconditions.checkArgument(
+                (topics != null && topicPattern == null)
+                        || (topics == null && topicPattern != null),
+                "Either Topic or Topic Pattern must be set for source.");
+        this.topics = topics;
+        this.topicPattern = topicPattern;
+        this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+        this.startupMode =
+                Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
+        this.specificStartupOffsets =
+                Preconditions.checkNotNull(
+                        specificStartupOffsets, "Specific offsets must not be null.");
+        this.startupTimestampMillis = startupTimestampMillis;
+        this.upsertMode = upsertMode;
+        this.inLongMetric = inLongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return valueDecodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+        final DeserializationSchema<RowData> keyDeserialization =
+                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
+
+        final DeserializationSchema<RowData> valueDeserialization =
+                createDeserialization(context, valueDecodingFormat, valueProjection, null);
+
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        final FlinkKafkaConsumer<RowData> kafkaConsumer =
+                createKafkaConsumer(keyDeserialization, valueDeserialization,
+                    producedTypeInfo, inLongMetric, auditHostAndPorts);
+
+        return SourceFunctionProvider.of(kafkaConsumer, false);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        // according to convention, the order of the final row must be
+        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+        // where the format metadata has highest precedence
+
+        // add value format metadata with prefix
+        valueDecodingFormat
+                .listReadableMetadata()
+                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
+
+        // add connector metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        // separate connector and format metadata
+        final List<String> formatMetadataKeys =
+                metadataKeys.stream()
+                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+                        .collect(Collectors.toList());
+        final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
+        connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+        // push down format metadata
+        final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream()
+                            .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
+                            .collect(Collectors.toList());
+            valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+
+        this.metadataKeys = connectorMetadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        final KafkaDynamicSource copy =
+                new KafkaDynamicSource(
+                        physicalDataType,
+                        keyDecodingFormat,
+                        valueDecodingFormat,
+                        keyProjection,
+                        valueProjection,
+                        keyPrefix,
+                        topics,
+                        topicPattern,
+                        properties,
+                        startupMode,
+                        specificStartupOffsets,
+                        startupTimestampMillis,
+                        upsertMode, inLongMetric, auditHostAndPorts);
+        copy.producedDataType = producedDataType;
+        copy.metadataKeys = metadataKeys;
+        copy.watermarkStrategy = watermarkStrategy;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Kafka table source";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final KafkaDynamicSource that = (KafkaDynamicSource) o;
+        return Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+                && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
+                && Arrays.equals(keyProjection, that.keyProjection)
+                && Arrays.equals(valueProjection, that.valueProjection)
+                && Objects.equals(keyPrefix, that.keyPrefix)
+                && Objects.equals(topics, that.topics)
+                && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
+                && Objects.equals(properties, that.properties)
+                && startupMode == that.startupMode
+                && Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
+                && startupTimestampMillis == that.startupTimestampMillis
+                && Objects.equals(upsertMode, that.upsertMode)
+                && Objects.equals(watermarkStrategy, that.watermarkStrategy);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                producedDataType,
+                metadataKeys,
+                physicalDataType,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topics,
+                topicPattern,
+                properties,
+                startupMode,
+                specificStartupOffsets,
+                startupTimestampMillis,
+                upsertMode,
+                watermarkStrategy);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    protected FlinkKafkaConsumer<RowData> createKafkaConsumer(
+            DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo,
+        String inlongMetric,
+        String auditHostAndPorts) {
+
+        final MetadataConverter[] metadataConverters =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                .orElseThrow(IllegalStateException::new))
+                        .map(m -> m.converter)
+                        .toArray(MetadataConverter[]::new);
+
+        // check if connector metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        // adjust physical arity with value format's metadata
+        final int adjustedPhysicalArity =
+                producedDataType.getChildren().size() - metadataKeys.size();
+
+        // adjust value format projection to include value format's metadata columns at the end
+        final int[] adjustedValueProjection =
+                IntStream.concat(
+                                IntStream.of(valueProjection),
+                                IntStream.range(
+                                        keyProjection.length + valueProjection.length,
+                                        adjustedPhysicalArity))
+                        .toArray();
+
+        final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+                new DynamicKafkaDeserializationSchema(
+                        adjustedPhysicalArity,
+                        keyDeserialization,
+                        keyProjection,
+                        valueDeserialization,
+                        adjustedValueProjection,
+                        hasMetadata,
+                        metadataConverters,
+                        producedTypeInfo,
+                        upsertMode, inlongMetric, auditHostAndPorts);
+
+        final FlinkKafkaConsumer<RowData> kafkaConsumer;
+        if (topics != null) {
+            kafkaConsumer = new FlinkKafkaConsumer<>(topics, kafkaDeserializer, properties);
+        } else {
+            kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, kafkaDeserializer, properties);
+        }
+
+        switch (startupMode) {
+            case EARLIEST:
+                kafkaConsumer.setStartFromEarliest();
+                break;
+            case LATEST:
+                kafkaConsumer.setStartFromLatest();
+                break;
+            case GROUP_OFFSETS:
+                kafkaConsumer.setStartFromGroupOffsets();
+                break;
+            case SPECIFIC_OFFSETS:
+                kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
+                break;
+            case TIMESTAMP:
+                kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
+                break;
+        }
+
+        kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);
+
+        if (watermarkStrategy != null) {
+            kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
+        }
+        return kafkaConsumer;
+    }
+
+    private @Nullable DeserializationSchema<RowData> createDeserialization(
+            DynamicTableSource.Context context,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType =
+                DataTypeUtils.projectRow(this.physicalDataType, projection);
+        if (prefix != null) {
+            physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeDecoder(context, physicalFormatDataType);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return StringData.fromString(record.topic());
+                    }
+                }),
+
+        PARTITION(
+                "partition",
+                DataTypes.INT().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.partition();
+                    }
+                }),
+
+        HEADERS(
+                "headers",
+                // key and value of the map are nullable to make handling easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
+                        .notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        final Map<StringData, byte[]> map = new HashMap<>();
+                        for (Header header : record.headers()) {
+                            map.put(StringData.fromString(header.key()), header.value());
+                        }
+                        return new GenericMapData(map);
+                    }
+                }),
+
+        LEADER_EPOCH(
+                "leader-epoch",
+                DataTypes.INT().nullable(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.leaderEpoch().orElse(null);
+                    }
+                }),
+
+        OFFSET(
+                "offset",
+                DataTypes.BIGINT().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.offset();
+                    }
+                }),
+
+        TIMESTAMP(
+                "timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return TimestampData.fromEpochMillis(record.timestamp());
+                    }
+                }),
+
+        TIMESTAMP_TYPE(
+                "timestamp-type",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return StringData.fromString(record.timestampType().toString());
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 97ffe1ec4..c0e41f1b0 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
 import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
@@ -87,6 +86,8 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
 
 /**
@@ -207,6 +208,8 @@ public class KafkaDynamicTableFactory
         options.add(SINK_SEMANTIC);
         options.add(SINK_PARALLELISM);
         options.add(KAFKA_IGNORE_ALL_CHANGELOG);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
         return options;
     }
 
@@ -251,6 +254,10 @@ public class KafkaDynamicTableFactory
 
         final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
 
+        final String inLongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null);
+
+        final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+
         return createKafkaTableSource(
                 physicalDataType,
                 keyDecodingFormat.orElse(null),
@@ -263,7 +270,9 @@ public class KafkaDynamicTableFactory
                 properties,
                 startupOptions.startupMode,
                 startupOptions.specificOffsets,
-                startupOptions.startupTimestampMillis);
+                startupOptions.startupTimestampMillis,
+            inLongMetric,
+            auditHostAndPorts);
     }
 
     @Override
@@ -327,7 +336,9 @@ public class KafkaDynamicTableFactory
             Properties properties,
             StartupMode startupMode,
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
-            long startupTimestampMillis) {
+            long startupTimestampMillis,
+            String inLongMetric,
+        String auditHostAndPorts) {
         return new KafkaDynamicSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -341,7 +352,9 @@ public class KafkaDynamicTableFactory
                 startupMode,
                 specificStartupOffsets,
                 startupTimestampMillis,
-                false);
+                false,
+            inLongMetric,
+            auditHostAndPorts);
     }
 
     protected KafkaDynamicSink createKafkaTableSink(
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index d3003b1ad..9cb307f7e 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -186,6 +186,10 @@ public class FlinkSqlParser implements Parser {
             properties.put(InlongMetric.METRIC_KEY,
                     String.format(InlongMetric.METRIC_VALUE_FORMAT, groupInfo.getGroupId(),
                             streamInfo.getStreamId(), node.getId()));
+            if (StringUtils.isNotEmpty(groupInfo.getProperties().get(InlongMetric.AUDIT_KEY))) {
+                properties.put(InlongMetric.AUDIT_KEY,
+                    groupInfo.getProperties().get(InlongMetric.AUDIT_KEY));
+            }
         });
     }