You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/04 09:19:22 UTC

[inlong] branch master updated: [INLONG-5262][Sort] Add metric report for Pulsar source (#5276)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new aa2f8f26e [INLONG-5262][Sort] Add metric report for Pulsar source (#5276)
aa2f8f26e is described below

commit aa2f8f26ed3a8cc3ad736b87e66b58e9417de7cc
Author: Schnapps <zp...@connect.ust.hk>
AuthorDate: Thu Aug 4 17:19:18 2022 +0800

    [INLONG-5262][Sort] Add metric report for Pulsar source (#5276)
---
 inlong-sort/sort-connectors/pulsar/pom.xml         |   6 +
 .../apache/inlong/sort/pulsar/table/Constants.java |  32 ++
 .../table/DynamicPulsarDeserializationSchema.java  | 352 +++++++++++++++
 .../pulsar/table/PulsarDynamicTableFactory.java    |  16 +-
 .../pulsar/table/PulsarDynamicTableSource.java     | 493 +++++++++++++++++++++
 .../table/UpsertPulsarDynamicTableFactory.java     |   7 +-
 .../org.apache.flink.table.factories.Factory       |   3 +-
 7 files changed, 902 insertions(+), 7 deletions(-)

diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-connectors/pulsar/pom.xml
index 9c3b24be5..395dcd0a3 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -54,6 +54,11 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-avro</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -73,6 +78,7 @@
                             <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
                             <artifactSet>
                                 <includes>
+                                    <include>org.apache.inlong:*</include>
                                     <include>io.streamnative.connectors:pulsar-flink-connector-origin*</include>
                                     <include>io.streamnative.connectors:flink-protobuf</include>
                                     <include>org.apache.pulsar:*</include>
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
new file mode 100644
index 000000000..18fd19955
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/Constants.java
@@ -0,0 +1,32 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class Constants {
+
+    public static final ConfigOption<String> INLONG_METRIC =
+        ConfigOptions.key("inlong.metric")
+            .stringType()
+            .defaultValue("")
+            .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID");
+
+}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
new file mode 100644
index 000000000..7f6b1fb95
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -0,0 +1,352 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
+import org.apache.flink.streaming.util.serialization.FlinkSchema;
+import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
+import org.apache.flink.streaming.util.serialization.ThreadSafeDeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+
+/**
+ * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}.
+ */
+class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Nullable
+    private final DeserializationSchema<RowData> keyDeserialization;
+
+    private final DeserializationSchema<RowData> valueDeserialization;
+
+    private final boolean hasMetadata;
+
+    private final OutputProjectionCollector outputCollector;
+
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    private final boolean upsertMode;
+
+    private SourceMetricData sourceMetricData;
+
+    private String inlongMetric;
+
+    private static final ThreadLocal<SimpleCollector<RowData>> tlsCollector =
+            new ThreadLocal<SimpleCollector<RowData>>() {
+                @Override
+                public SimpleCollector initialValue() {
+                    return new SimpleCollector();
+                }
+            };
+
+    DynamicPulsarDeserializationSchema(
+            int physicalArity,
+            @Nullable DeserializationSchema<RowData> keyDeserialization,
+            int[] keyProjection,
+            DeserializationSchema<RowData> valueDeserialization,
+            int[] valueProjection,
+            boolean hasMetadata,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean upsertMode,
+    String inlongMetric) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keyDeserialization != null && keyProjection.length > 0,
+                    "Key must be set in upsert mode for deserialization schema.");
+        }
+        this.keyDeserialization = ThreadSafeDeserializationSchema.of(keyDeserialization);
+        this.valueDeserialization = ThreadSafeDeserializationSchema.of(valueDeserialization);
+        this.hasMetadata = hasMetadata;
+        this.outputCollector = new OutputProjectionCollector(
+                physicalArity,
+                keyProjection,
+                valueProjection,
+                metadataConverters,
+                upsertMode);
+        this.producedTypeInfo = producedTypeInfo;
+        this.upsertMode = upsertMode;
+        this.inlongMetric = inlongMetric;
+    }
+
+    @Override
+    public void open(DeserializationSchema.InitializationContext context) throws Exception {
+        if (keyDeserialization != null) {
+            keyDeserialization.open(context);
+        }
+        valueDeserialization.open(context);
+
+        if (inlongMetric != null && !inlongMetric.isEmpty()) {
+            sourceMetricData = new SourceMetricData(context.getMetricGroup());
+            String[] inLongMetricArray = inlongMetric.split(DELIMITER);
+            String groupId = inLongMetricArray[0];
+            String streamId = inLongMetricArray[1];
+            String nodeId = inLongMetricArray[2];
+            sourceMetricData.registerMetricsForNumBytesIn(groupId,
+                streamId, nodeId, NUM_BYTES_IN);
+            sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId,
+                streamId, nodeId, NUM_BYTES_IN_PER_SECOND);
+            sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId,
+                nodeId, NUM_RECORDS_IN);
+            sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
+                nodeId, NUM_RECORDS_IN_PER_SECOND);
+        }
+
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData nextElement) {
+        return false;
+    }
+
+    @Override
+    public RowData deserialize(Message<RowData> message) throws IOException {
+        final SimpleCollector<RowData> collector = tlsCollector.get();
+        deserialize(message, collector);
+        return collector.takeRecord();
+    }
+
+    @Override
+    public void deserialize(Message<RowData> message, Collector<RowData> collector) throws IOException {
+        // shortcut in case no output projection is required,
+        // also not for a cartesian product with the keys
+        if (keyDeserialization == null && !hasMetadata) {
+            valueDeserialization.deserialize(message.getData(), collector);
+            if (sourceMetricData != null) {
+                sourceMetricData.getNumRecordsIn().inc(1L);
+                sourceMetricData.getNumBytesIn()
+                    .inc(message.getData().length);
+            }
+            return;
+        }
+        BufferingCollector keyCollector = new BufferingCollector();
+
+        // buffer key(s)
+        if (keyDeserialization != null) {
+            keyDeserialization.deserialize(message.getKeyBytes(), keyCollector);
+        }
+
+        // project output while emitting values
+        outputCollector.inputMessage = message;
+        outputCollector.physicalKeyRows = keyCollector.buffer;
+        outputCollector.outputCollector = collector;
+        if ((message.getData() == null || message.getData().length == 0) && upsertMode) {
+            // collect tombstone messages in upsert mode by hand
+            outputCollector.collect(null);
+        } else {
+            valueDeserialization.deserialize(message.getData(), outputCollector);
+            if (sourceMetricData != null) {
+                sourceMetricData.getNumRecordsIn().inc(1L);
+                sourceMetricData.getNumBytesIn()
+                    .inc(message.getData().length);
+            }
+        }
+
+        keyCollector.buffer.clear();
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    @Override
+    public Schema<RowData> getSchema() {
+        return new FlinkSchema<>(Schema.BYTES.getSchemaInfo(), null, valueDeserialization);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    interface MetadataConverter extends Serializable {
+        Object read(Message<?> message);
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final List<RowData> buffer = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            buffer.add(record);
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+    }
+
+    private static class SimpleCollector<T> implements Collector<T> {
+        private T record;
+
+        @Override
+        public void collect(T record) {
+            this.record = record;
+        }
+
+        @Override
+        public void close() {
+
+        }
+
+        private T getRecord() {
+            return record;
+        }
+
+        private T takeRecord() {
+            T result = record;
+            reset();
+            return result;
+        }
+
+        private void reset() {
+            record = null;
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Emits a row with key, value, and metadata fields.
+     *
+     * <p>The collector is able to handle the following kinds of keys:
+     * <ul>
+     *     <li>No key is used.
+     *     <li>A key is used.
+     *     <li>The deserialization schema emits multiple keys.
+     *     <li>Keys and values have overlapping fields.
+     *     <li>Keys are used and value is null.
+     * </ul>
+     */
+    private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final int physicalArity;
+
+        private final int[] keyProjection;
+
+        private final int[] valueProjection;
+
+        private final MetadataConverter[] metadataConverters;
+
+        private final boolean upsertMode;
+
+        private transient Message<?> inputMessage;
+
+        private transient List<RowData> physicalKeyRows;
+
+        private transient Collector<RowData> outputCollector;
+
+        OutputProjectionCollector(
+                int physicalArity,
+                int[] keyProjection,
+                int[] valueProjection,
+                MetadataConverter[] metadataConverters,
+                boolean upsertMode) {
+            this.physicalArity = physicalArity;
+            this.keyProjection = keyProjection;
+            this.valueProjection = valueProjection;
+            this.metadataConverters = metadataConverters;
+            this.upsertMode = upsertMode;
+        }
+
+        @Override
+        public void collect(RowData physicalValueRow) {
+            // no key defined
+            if (keyProjection.length == 0) {
+                emitRow(null, (GenericRowData) physicalValueRow);
+                return;
+            }
+
+            // otherwise emit a value for each key
+            for (RowData physicalKeyRow : physicalKeyRows) {
+                emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
+            }
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+
+        private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
+            final RowKind rowKind;
+            if (physicalValueRow == null) {
+                if (upsertMode) {
+                    rowKind = RowKind.DELETE;
+                } else {
+                    throw new DeserializationException(
+                            "Invalid null value received in non-upsert mode. "
+                                + "Could not to set row kind for output record.");
+                }
+            } else {
+                rowKind = physicalValueRow.getRowKind();
+            }
+
+            final int metadataArity = metadataConverters.length;
+            final GenericRowData producedRow = new GenericRowData(
+                    rowKind,
+                    physicalArity + metadataArity);
+
+            if (physicalValueRow != null) {
+                for (int valuePos = 0; valuePos < valueProjection.length; valuePos++) {
+                    producedRow.setField(valueProjection[valuePos], physicalValueRow.getField(valuePos));
+                }
+            }
+
+            for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+                assert physicalKeyRow != null;
+                producedRow.setField(keyProjection[keyPos], physicalKeyRow.getField(keyPos));
+            }
+
+            for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
+                producedRow.setField(physicalArity + metadataPos, metadataConverters[metadataPos].read(inputMessage));
+            }
+
+            outputCollector.collect(producedRow);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index 47d90f375..321d3f830 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -78,6 +77,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
 
 /**
  * Copy from io.streamnative.connectors:pulsar-flink-connector_2.11:1.13.6.1-rc9
@@ -272,6 +272,9 @@ public class PulsarDynamicTableFactory implements
 
         String adminUrl = tableOptions.get(ADMIN_URL);
         String serviceUrl = tableOptions.get(SERVICE_URL);
+
+        String inlongMetric = tableOptions.get(INLONG_METRIC);
+
         return createPulsarTableSource(
                 physicalDataType,
                 keyDecodingFormat.orElse(null),
@@ -284,7 +287,8 @@ public class PulsarDynamicTableFactory implements
                 serviceUrl,
                 adminUrl,
                 properties,
-                startupOptions);
+                startupOptions,
+            inlongMetric);
     }
 
     @Override
@@ -322,6 +326,8 @@ public class PulsarDynamicTableFactory implements
         options.add(SINK_MESSAGE_ROUTER);
         options.add(SINK_PARALLELISM);
         options.add(PROPERTIES);
+        options.add(INLONG_METRIC);
+
         return options;
     }
 
@@ -351,7 +357,8 @@ public class PulsarDynamicTableFactory implements
             String serviceUrl,
             String adminUrl,
             Properties properties,
-            PulsarTableOptions.StartupOptions startupOptions) {
+            PulsarTableOptions.StartupOptions startupOptions,
+        String inLongMetric) {
         return new PulsarDynamicTableSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -365,6 +372,7 @@ public class PulsarDynamicTableFactory implements
                 adminUrl,
                 properties,
                 startupOptions,
-                false);
+                false,
+            inLongMetric);
     }
 }
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
new file mode 100644
index 000000000..6f75555a2
--- /dev/null
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -0,0 +1,493 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.table;
+
+import static org.apache.flink.table.descriptors.PulsarValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
+import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
+import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
+import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
+import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+/**
+ * pulsar dynamic table source.
+ */
+public class PulsarDynamicTableSource implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {
+
+    // --------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // --------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    /** Watermark strategy that is used to generate per-partition watermark. */
+    protected @Nullable
+    WatermarkStrategy<RowData> watermarkStrategy;
+
+    // --------------------------------------------------------------------------------------------
+    // Format attributes
+    // --------------------------------------------------------------------------------------------
+
+    private static final String VALUE_METADATA_PREFIX = "value.";
+
+    /** Data type to configure the formats. */
+    protected final DataType physicalDataType;
+
+    /** Optional format for decoding keys from Pulsar. */
+    protected final @Nullable
+    DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat;
+
+    /** Format for decoding values from Pulsar. */
+    protected final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat;
+
+    /** Indices that determine the key fields and the target position in the produced row. */
+    protected final int[] keyProjection;
+
+    /** Indices that determine the value fields and the target position in the produced row. */
+    protected final int[] valueProjection;
+
+    /** Prefix that needs to be removed from fields when constructing the physical data type. */
+    @Nullable
+    protected final String keyPrefix;
+    // --------------------------------------------------------------------------------------------
+    // Pulsar-specific attributes
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * The Pulsar topic to consume.
+     */
+    protected final List<String> topics;
+
+    /**
+     * The Pulsar topic to consume.
+     */
+    protected final String topicPattern;
+
+    /**
+     * The Pulsar topic to consume.
+     */
+    protected final String serviceUrl;
+
+    /**
+     * The Pulsar topic to consume.
+     */
+    protected final String adminUrl;
+
+    /**
+     * Properties for the Pulsar consumer.
+     */
+    protected final Properties properties;
+
+    /**
+     * The startup mode for the contained consumer (default is {@link StartupMode#LATEST}).
+     */
+    protected final PulsarTableOptions.StartupOptions startupOptions;
+
+    /**
+     * The default value when startup timestamp is not used.
+     */
+    private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
+
+    /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. **/
+    protected final boolean upsertMode;
+
+    protected String inLongMetric;
+
+    public PulsarDynamicTableSource(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            List<String> topics,
+            String topicPattern,
+            String serviceUrl,
+            String adminUrl,
+            Properties properties,
+            PulsarTableOptions.StartupOptions startupOptions,
+            boolean upsertMode,
+            String inlongMetric) {
+        this.producedDataType = physicalDataType;
+        setTopicInfo(properties, topics, topicPattern);
+
+        // Format attributes
+        this.physicalDataType = Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null.");
+        this.keyDecodingFormat = keyDecodingFormat;
+        this.valueDecodingFormat = Preconditions.checkNotNull(
+                valueDecodingFormat, "Value decoding format must not be null.");
+        this.keyProjection = Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
+        this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
+        this.keyPrefix = keyPrefix;
+        // Mutable attributes
+        this.producedDataType = physicalDataType;
+        this.metadataKeys = new ArrayList<>();
+        this.watermarkStrategy = null;
+        // Pulsar-specific attributes
+        Preconditions.checkArgument((topics != null && topicPattern == null)
+                || (topics == null && topicPattern != null),
+                "Either Topic or Topic Pattern must be set for source.");
+        this.topics = topics;
+        this.topicPattern = topicPattern;
+        this.adminUrl = adminUrl;
+        this.serviceUrl = serviceUrl;
+        this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
+        this.startupOptions = startupOptions;
+        this.upsertMode = upsertMode;
+        this.inLongMetric = inlongMetric;
+
+    }
+
+    private void setTopicInfo(Properties properties, List<String> topics, String topicPattern) {
+        if (StringUtils.isNotBlank(topicPattern)) {
+            properties.putIfAbsent("topicspattern", topicPattern);
+            properties.remove("topic");
+            properties.remove("topics");
+        } else if (topics != null && topics.size() > 1) {
+            properties.putIfAbsent("topics", StringUtils.join(topics, ","));
+            properties.remove("topicspattern");
+            properties.remove("topic");
+        } else if (topics != null && topics.size() == 1) {
+            properties.putIfAbsent("topic", StringUtils.join(topics, ","));
+            properties.remove("topicspattern");
+            properties.remove("topics");
+        } else {
+            throw new RuntimeException("Use `topics` instead of `topic` for multi topic read");
+        }
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return valueDecodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+
+        final DeserializationSchema<RowData> keyDeserialization =
+                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);
+
+        final DeserializationSchema<RowData> valueDeserialization =
+                createDeserialization(context, valueDecodingFormat, valueProjection, "");
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+        PulsarDeserializationSchema<RowData> deserializationSchema = createPulsarDeserialization(keyDeserialization,
+                valueDeserialization,
+                producedTypeInfo);
+        final ClientConfigurationData clientConfigurationData = PulsarClientUtils.newClientConf(serviceUrl, properties);
+        FlinkPulsarSource<RowData> source = new FlinkPulsarSource<>(
+                adminUrl,
+                clientConfigurationData,
+                deserializationSchema,
+                properties
+        );
+
+        if (watermarkStrategy != null) {
+            source.assignTimestampsAndWatermarks(watermarkStrategy);
+        }
+
+        switch (startupOptions.startupMode) {
+            case EARLIEST:
+                source.setStartFromEarliest();
+                break;
+            case LATEST:
+                source.setStartFromLatest();
+                break;
+            case SPECIFIC_OFFSETS:
+                source.setStartFromSpecificOffsets(startupOptions.specificOffsets);
+                break;
+            case EXTERNAL_SUBSCRIPTION:
+                MessageId subscriptionPosition = MessageId.latest;
+                if (CONNECTOR_STARTUP_MODE_VALUE_EARLIEST.equals(startupOptions.externalSubStartOffset)) {
+                    subscriptionPosition = MessageId.earliest;
+                }
+                source.setStartFromSubscription(startupOptions.externalSubscriptionName, subscriptionPosition);
+        }
+        return SourceFunctionProvider.of(source, false);
+    }
+
+    private PulsarDeserializationSchema<RowData> createPulsarDeserialization(
+            DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo) {
+        final DynamicPulsarDeserializationSchema.MetadataConverter[] metadataConverters = metadataKeys.stream()
+                .map(k ->
+                        Stream.of(ReadableMetadata.values())
+                                .filter(rm -> rm.key.equals(k))
+                                .findFirst()
+                                .orElseThrow(IllegalStateException::new))
+                .map(m -> m.converter)
+                .toArray(DynamicPulsarDeserializationSchema.MetadataConverter[]::new);
+
+        // check if connector metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        // adjust physical arity with value format's metadata
+        final int adjustedPhysicalArity = producedDataType.getChildren().size() - metadataKeys.size();
+
+        // adjust value format projection to include value format's metadata columns at the end
+        final int[] adjustedValueProjection = IntStream.concat(
+                IntStream.of(valueProjection),
+                IntStream.range(keyProjection.length + valueProjection.length, adjustedPhysicalArity))
+                .toArray();
+
+        return new DynamicPulsarDeserializationSchema(
+                adjustedPhysicalArity,
+                keyDeserialization,
+                keyProjection,
+                valueDeserialization,
+                adjustedValueProjection,
+                hasMetadata,
+                metadataConverters,
+                producedTypeInfo,
+                upsertMode,
+            inLongMetric);
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        final PulsarDynamicTableSource copy = new PulsarDynamicTableSource(
+                physicalDataType,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topics,
+                topicPattern,
+                serviceUrl,
+                adminUrl,
+                properties,
+                startupOptions,
+                false, inLongMetric);
+        copy.producedDataType = producedDataType;
+        copy.metadataKeys = metadataKeys;
+        copy.watermarkStrategy = watermarkStrategy;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Pulsar universal table source";
+    }
+
+    private static ClientConfigurationData newClientConf(String serviceUrl) {
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        clientConf.setServiceUrl(serviceUrl);
+        return clientConf;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof PulsarDynamicTableSource)) {
+            return false;
+        }
+        PulsarDynamicTableSource that = (PulsarDynamicTableSource) o;
+        return upsertMode == that.upsertMode && Objects.equals(producedDataType, that.producedDataType)
+            && Objects.equals(metadataKeys, that.metadataKeys)
+            && Objects.equals(watermarkStrategy, that.watermarkStrategy)
+            && Objects.equals(physicalDataType, that.physicalDataType)
+            && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+            && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
+            && Arrays.equals(keyProjection, that.keyProjection)
+            && Arrays.equals(valueProjection, that.valueProjection)
+            && Objects.equals(keyPrefix, that.keyPrefix)
+            && Objects.equals(topics, that.topics)
+            && Objects.equals(topicPattern, that.topicPattern)
+            && Objects.equals(serviceUrl, that.serviceUrl)
+            && Objects.equals(adminUrl, that.adminUrl)
+            && Objects.equals(new HashMap<>(properties), new HashMap<>(that.properties))
+            && Objects.equals(startupOptions, that.startupOptions);
+    }
+
+    @Override
+    public int hashCode() {
+        int result =
+                Objects.hash(producedDataType, metadataKeys, watermarkStrategy, physicalDataType, keyDecodingFormat,
+                        valueDecodingFormat, keyPrefix, topics, topicPattern, serviceUrl, adminUrl, properties,
+                        startupOptions,
+                        upsertMode);
+        result = 31 * result + Arrays.hashCode(keyProjection);
+        result = 31 * result + Arrays.hashCode(valueProjection);
+        return result;
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        // according to convention, the order of the final row must be
+        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+        // where the format metadata has highest precedence
+
+        // add value format metadata with prefix
+        valueDecodingFormat
+                .listReadableMetadata()
+                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));
+
+        // add connector metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));
+
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+        // separate connector and format metadata
+        final List<String> formatMetadataKeys = metadataKeys.stream()
+                .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+                .collect(Collectors.toList());
+        final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
+        connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+        // push down format metadata
+        final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys = formatMetadataKeys.stream()
+                    .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
+                    .collect(Collectors.toList());
+            valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+
+        this.metadataKeys = connectorMetadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    private @Nullable
+    DeserializationSchema<RowData> createDeserialization(
+            Context context,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType = DataTypeUtils.projectRow(this.physicalDataType, projection);
+        if (prefix != null) {
+            physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeDecoder(context, physicalFormatDataType);
+    }
+
+    @Override
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Metadata handling
+    // --------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                message -> StringData.fromString(message.getTopicName())
+        ),
+
+        MESSAGE_ID(
+                "messageId",
+                DataTypes.BYTES().notNull(),
+                message -> message.getMessageId().toByteArray()),
+
+        SEQUENCE_ID(
+                "sequenceId",
+                DataTypes.BIGINT().notNull(),
+                Message::getSequenceId),
+
+        PUBLISH_TIME(
+                "publishTime",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                message -> TimestampData.fromEpochMillis(message.getPublishTime())),
+
+        EVENT_TIME(
+                "eventTime",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                message -> TimestampData.fromEpochMillis(message.getEventTime())),
+
+        PROPERTIES(
+                "properties",
+                // key and value of the map are nullable to make handling easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()).notNull(),
+                message -> {
+                    final Map<StringData, StringData> map = new HashMap<>();
+                    for (Map.Entry<String, String> e: message.getProperties().entrySet()) {
+                        map.put(StringData.fromString(e.getKey()), StringData.fromString(e.getValue()));
+                    }
+                    return new GenericMapData(map);
+                }
+        );
+
+        final String key;
+
+        final DataType dataType;
+
+        final DynamicPulsarDeserializationSchema.MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType,
+                         DynamicPulsarDeserializationSchema.MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index b8a41360e..bf539d703 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.connectors.pulsar.config.StartupMode;
-import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
 import org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions;
 import org.apache.flink.streaming.connectors.pulsar.util.KeyHashMessageRouterImpl;
@@ -68,6 +67,7 @@ import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
 import static org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.pulsar.table.Constants.INLONG_METRIC;
 
 /**
  * Upsert-Pulsar factory.
@@ -150,6 +150,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
         options.add(VALUE_FIELDS_INCLUDE);
         options.add(FactoryUtil.SINK_PARALLELISM);
         options.add(PROPERTIES);
+        options.add(INLONG_METRIC);
         return options;
     }
 
@@ -185,6 +186,8 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
         String serverUrl = tableOptions.get(SERVICE_URL);
         List<String> topics = tableOptions.get(TOPIC);
         String topicPattern = tableOptions.get(TOPIC_PATTERN);
+        String inlongMetric = tableOptions.get(INLONG_METRIC);
+
         return new PulsarDynamicTableSource(
                 schema.toPhysicalRowDataType(),
                 keyDecodingFormat,
@@ -198,7 +201,7 @@ public class UpsertPulsarDynamicTableFactory implements DynamicTableSourceFactor
                 adminUrl,
                 properties,
                 startupOptions,
-                true);
+                true, inlongMetric);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 8def7c890..ece6f3022 100644
--- a/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/inlong-sort/sort-connectors/pulsar/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
 # limitations under the License.
 
 org.apache.inlong.sort.formats.inlongmsg.InLongMsgFormatFactory
-org.apache.inlong.sort.pulsar.table.PulsarDynamicTableFactory
\ No newline at end of file
+org.apache.inlong.sort.pulsar.table.PulsarDynamicTableFactory
+org.apache.inlong.sort.pulsar.table.UpsertPulsarDynamicTableFactory
\ No newline at end of file