You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/08/26 13:36:27 UTC

[incubator-inlong] branch master updated: [INLONG-742] Sort Support Pulsar (#1481)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dadc59  [INLONG-742] Sort Support Pulsar (#1481)
7dadc59 is described below

commit 7dadc59609d1734e1d0550992f24a1414ddc3b95
Author: Kevin Wen <89...@users.noreply.github.com>
AuthorDate: Thu Aug 26 21:36:21 2021 +0800

    [INLONG-742] Sort Support Pulsar (#1481)
---
 inlong-sort/pom.xml                                |  13 +
 .../inlong/sort/configuration/Constants.java       |   5 +
 .../apache/inlong/sort/protocol/DataFlowInfo.java  |  46 +-
 .../sort/protocol/source/PulsarSourceInfo.java     |  19 +
 inlong-sort/sort-core/pom.xml                      |  32 ++
 .../org/apache/inlong/sort/flink/SourceEvent.java  |  82 +++
 .../sort/flink/hive/HiveMultiTenantWriter.java     |   3 +-
 .../MultiTenantFunctionInitializationContext.java  |  24 +-
 .../flink/pulsar/MultiTenancyPulsarConsumer.java   | 171 ++++++
 .../pulsar/MultiTopicPulsarSourceFunction.java     | 201 +++++++
 .../flink/pulsar/PulsarDeserializationSchema.java  |  52 ++
 .../inlong/sort/flink/pulsar/PulsarOptions.java    | 122 +++++
 .../sort/flink/pulsar/PulsarSourceFunction.java    | 580 +++++++++++++++++++++
 .../inlong/sort/flink/pulsar/PulsarUtils.java      | 223 ++++++++
 .../SerializedRecordDeserializationSchema.java     |  46 ++
 .../pulsar/MultiTopicPulsarSourceFunctionTest.java | 236 +++++++++
 .../flink/pulsar/PulsarTestMetaManagerUtil.java    |  88 ++++
 .../sort/flink/pulsar/TestSourceContext.java       |  71 +++
 .../inlong/sort/util/TestMetaManagerUtil.java      |  65 +++
 19 files changed, 2071 insertions(+), 8 deletions(-)

diff --git a/inlong-sort/pom.xml b/inlong-sort/pom.xml
index 83ae849..f40b41e 100644
--- a/inlong-sort/pom.xml
+++ b/inlong-sort/pom.xml
@@ -53,6 +53,7 @@
         <snappy.version>1.1.4</snappy.version>
         <hadoop.version>2.8.5</hadoop.version>
         <hive.version>2.3.8</hive.version>
+        <pulsar.version>2.6.2</pulsar.version>
     </properties>
 
     <modules>
@@ -305,6 +306,18 @@
                 </exclusions>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client-admin</artifactId>
+                <version>${pulsar.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client</artifactId>
+                <version>${pulsar.version}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index aa9391e..029cc77 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -48,6 +48,11 @@ public class Constants {
      */
     public static final String DATA_TIME_FIELD = "dt";
 
+    /**
+     * The prefix of source or sink configuration.
+     */
+    public static final String PULSAR_SOURCE_PREFIX = "pulsar.source.";
+
     // ------------------------------------------------------------------------
     //  Common configs
     // ------------------------------------------------------------------------
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java
index 22b1811..923e309 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/DataFlowInfo.java
@@ -19,11 +19,16 @@ package org.apache.inlong.sort.protocol;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
-import org.apache.inlong.sort.protocol.source.SourceInfo;
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.inlong.sort.protocol.sink.SinkInfo;
+import org.apache.inlong.sort.protocol.source.SourceInfo;
 
 /**
  * Data flow protocol.
@@ -38,14 +43,26 @@ public class DataFlowInfo implements Serializable {
 
     private final SinkInfo sinkInfo;
 
+    @JsonInclude(Include.NON_NULL)
+    private final Map<String, Object> properties;
+
     @JsonCreator
     public DataFlowInfo(
             @JsonProperty("id") long id,
             @JsonProperty("source_info") SourceInfo sourceInfo,
-            @JsonProperty("sink_info") SinkInfo sinkInfo) {
+            @JsonProperty("sink_info") SinkInfo sinkInfo,
+            @JsonProperty("properties") Map<String, Object> properties) {
         this.id = id;
         this.sourceInfo = checkNotNull(sourceInfo);
         this.sinkInfo = checkNotNull(sinkInfo);
+        this.properties = properties == null ? new HashMap<>() : properties;
+    }
+
+    public DataFlowInfo(long id, SourceInfo sourceInfo, SinkInfo sinkInfo) {
+        this.id = id;
+        this.sourceInfo = sourceInfo;
+        this.sinkInfo = sinkInfo;
+        this.properties = new HashMap<>();
     }
 
     @JsonProperty("id")
@@ -62,4 +79,27 @@ public class DataFlowInfo implements Serializable {
     public SinkInfo getSinkInfo() {
         return sinkInfo;
     }
+
+    @JsonProperty("properties")
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DataFlowInfo other = (DataFlowInfo) o;
+
+        return Objects.equals(id, other.id)
+                && Objects.equals(sourceInfo, other.sourceInfo)
+                && Objects.equals(sinkInfo, other.sinkInfo)
+                && Objects.equals(properties, other.properties);
+    }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java
index c5da028..e669ec0 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/source/PulsarSourceInfo.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.source;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Objects;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.inlong.sort.protocol.FieldInfo;
@@ -71,4 +72,22 @@ public class PulsarSourceInfo extends SourceInfo {
     public String getSubscriptionName() {
         return subscriptionName;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        PulsarSourceInfo other = (PulsarSourceInfo) o;
+        return super.equals(other)
+                && Objects.equals(adminUrl, other.adminUrl)
+                && Objects.equals(serviceUrl, other.serviceUrl)
+                && Objects.equals(subscriptionName, other.subscriptionName)
+                && Objects.equals(topic, other.topic);
+    }
 }
diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml
index 216f9b6..1b7cadd 100644
--- a/inlong-sort/sort-core/pom.xml
+++ b/inlong-sort/sort-core/pom.xml
@@ -163,6 +163,38 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-admin</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.15.3</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-api</artifactId>
             <version>${project.version}</version>
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SourceEvent.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SourceEvent.java
new file mode 100644
index 0000000..37441ce
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/SourceEvent.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.apache.inlong.sort.protocol.source.SourceInfo;
+
+public class SourceEvent implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final SourceEventType sourceEventType;
+
+    private final long dataFlowId;
+
+    private final SourceInfo sourceInfo;
+
+    private final Map<String, Object> properties;
+
+    public SourceEvent(SourceEventType sourceEventType, long dataFlowId, PulsarSourceInfo pulsarSourceInfo,
+            Map<String, Object> properties) {
+        this.sourceEventType = sourceEventType;
+        this.dataFlowId = dataFlowId;
+        this.sourceInfo = checkNotNull(pulsarSourceInfo);
+        this.properties = properties;
+    }
+
+    public SourceEventType getSourceEventType() {
+        return sourceEventType;
+    }
+
+    public SourceInfo getSourceInfo() {
+        return sourceInfo;
+    }
+
+    public long getDataFlowId() {
+        return dataFlowId;
+    }
+
+    public Map<String, Object> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public String toString() {
+        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+                .append("sourceEventType", sourceEventType)
+                .append("dataFlowId", dataFlowId)
+                .append("sourceInfo", sourceInfo)
+                .append("properties", properties)
+                .toString();
+    }
+
+    public enum SourceEventType {
+        ADDED,
+        UPDATE,
+        REMOVED
+    }
+}
+
+
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java
index fb71fd9..3d2a39f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/hive/HiveMultiTenantWriter.java
@@ -171,7 +171,8 @@ public class HiveMultiTenantWriter extends ProcessFunction<SerializedRecord, Par
                 HiveWriter hiveWriter = new HiveWriter(configuration, dataFlowId, hiveSinkInfo);
                 hiveWriter.setRuntimeContext(getRuntimeContext());
                 hiveWriter.initializeState(
-                        new MultiTenantFunctionInitializationContext(dataFlowId, functionInitializationContext));
+                        new MultiTenantFunctionInitializationContext(dataFlowId, functionInitializationContext,
+                                getRuntimeContext().getExecutionConfig()));
                 hiveWriter.open(new org.apache.flink.configuration.Configuration());
                 hiveWriters.put(dataFlowId, hiveWriter);
                 recordTransformer.addDataFlow(dataFlowInfo);
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java
index 0dc355e..ab7a076 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/multitenant/MultiTenantFunctionInitializationContext.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
@@ -41,9 +42,11 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
 
     private final MultiTenantOperatorStateStore tenantOperatorStateStore;
 
-    public MultiTenantFunctionInitializationContext(long tenantId, FunctionInitializationContext parentContext) {
+    public MultiTenantFunctionInitializationContext(long tenantId, FunctionInitializationContext parentContext,
+            ExecutionConfig executionConfig) {
         this.parentContext = checkNotNull(parentContext);
-        tenantOperatorStateStore = new MultiTenantOperatorStateStore(tenantId, parentContext.getOperatorStateStore());
+        tenantOperatorStateStore = new MultiTenantOperatorStateStore(tenantId, parentContext.getOperatorStateStore(),
+                executionConfig);
     }
 
     @Override
@@ -73,9 +76,13 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
 
         private final OperatorStateStore operatorStateStore;
 
-        public MultiTenantOperatorStateStore(long tenantId, OperatorStateStore operatorStateStore) {
+        private final ExecutionConfig executionConfig;
+
+        public MultiTenantOperatorStateStore(long tenantId, OperatorStateStore operatorStateStore,
+                ExecutionConfig executionConfig) {
             this.tenantId = tenantId;
             this.operatorStateStore = checkNotNull(operatorStateStore);
+            this.executionConfig = executionConfig;
         }
 
         @Override
@@ -86,12 +93,14 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
 
         @Override
         public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+            serializerInit(listStateDescriptor);
             return operatorStateStore
                     .getListState(new MultiTenantListStateDescriptor<>(tenantId, listStateDescriptor));
         }
 
         @Override
         public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+            serializerInit(listStateDescriptor);
             return operatorStateStore
                     .getUnionListState(new MultiTenantListStateDescriptor<>(tenantId, listStateDescriptor));
         }
@@ -111,6 +120,7 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
 
         @Override
         public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+            serializerInit(listStateDescriptor);
             return operatorStateStore
                     .getListState(new MultiTenantListStateDescriptor<S>(tenantId, listStateDescriptor));
         }
@@ -119,6 +129,12 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
         public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
             return operatorStateStore.getSerializableListState(getTenantStateName(tenantId, stateName));
         }
+
+        private <S> void serializerInit(ListStateDescriptor<S> listStateDescriptor) {
+            if (!listStateDescriptor.isSerializerInitialized()) {
+                listStateDescriptor.initializeSerializerUnlessSet(executionConfig);
+            }
+        }
     }
 
     private static class MultiTenantListStateDescriptor<T> extends ListStateDescriptor<T> {
@@ -130,4 +146,4 @@ public class MultiTenantFunctionInitializationContext implements FunctionInitial
                     listStateDescriptor.getElementSerializer());
         }
     }
-}
+}
\ No newline at end of file
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTenancyPulsarConsumer.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTenancyPulsarConsumer.java
new file mode 100644
index 0000000..499f4d0
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTenancyPulsarConsumer.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Consumer;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.inlong.sort.flink.tubemq.MultiTenancyTubeConsumer;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiTenancyPulsarConsumer<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MultiTenancyTubeConsumer.class);
+
+    private final ConcurrentMap<Long, PulsarPullThread> pulsarPullThreads;
+
+    private final Map<Long, PulsarSourceFunction<T>> pulsarSourceFunctions;
+
+    private final Consumer<Throwable> exceptionCatcher;
+
+    private SourceContext<T> context;
+
+    public MultiTenancyPulsarConsumer(Consumer<Throwable> exceptionCatcher) {
+        this.exceptionCatcher = checkNotNull(exceptionCatcher);
+        this.pulsarPullThreads = new ConcurrentHashMap<>();
+        this.pulsarSourceFunctions = new HashMap<>();
+    }
+
+    public void start(SourceContext<T> context) throws Exception {
+        this.context = checkNotNull(context);
+    }
+
+    public void cancel() {
+        for (PulsarPullThread pulsarPullThread : pulsarPullThreads.values()) {
+            pulsarPullThread.cancel();
+            pulsarPullThread.interrupt();
+        }
+    }
+
+    /**
+     * Release pulsarPullThreads.
+     *
+     * @throws Exception
+     */
+    public void close() throws Exception {
+        MetaManager.release();
+        cancel();
+        for (PulsarPullThread pulsarPullThread : pulsarPullThreads.values()) {
+            try {
+                pulsarPullThread.join();
+            } catch (InterruptedException e) {
+                LOG.error("Could not cancel thread {}", pulsarPullThread.getName());
+            }
+        }
+        pulsarPullThreads.clear();
+
+        for (PulsarSourceFunction<T> sourceFunction : pulsarSourceFunctions.values()) {
+            try {
+                sourceFunction.close();
+            } catch (Throwable throwable) {
+                LOG.warn("Could not properly shutdown the pulsar pull consumer.", throwable);
+            }
+        }
+        pulsarSourceFunctions.clear();
+    }
+
+    /**
+     * Add a new pulsar source.
+     */
+    public void addPulsarSource(long dataflowId, PulsarSourceFunction<T> pulsarSourceFunction) throws Exception {
+        if (pulsarPullThreads.containsKey(dataflowId)) {
+            LOG.warn("Pull thread of dataflow-id {} has already been started", dataflowId);
+            return;
+        }
+        PulsarPullThread pulsarPullThread = new PulsarPullThread(pulsarSourceFunction);
+        pulsarPullThread.setName("PulsarPullThread-for-dataflowId-" + dataflowId);
+        pulsarPullThread.start();
+        pulsarPullThreads.put(dataflowId, pulsarPullThread);
+        pulsarSourceFunctions.put(dataflowId, pulsarSourceFunction);
+
+        LOG.info("Add pulsar source \"{}\" successfully!", dataflowId);
+    }
+
+    public void updatePulsarSource(long dataflowId, PulsarSourceFunction<T> pulsarSourceFunction) throws Exception {
+        removePulsarSource(dataflowId);
+        addPulsarSource(dataflowId, pulsarSourceFunction);
+    }
+
+    public void removePulsarSource(long dataflowId) {
+        PulsarPullThread pulsarPullThread = pulsarPullThreads.get(dataflowId);
+        if (pulsarPullThread != null) {
+            pulsarPullThread.cancel();
+            pulsarPullThread.interrupt();
+            try {
+                pulsarPullThread.join();
+            } catch (InterruptedException e) {
+                LOG.error("Could not cancel thread {}", pulsarPullThread.getName());
+            }
+            pulsarPullThreads.remove(dataflowId);
+        }
+        PulsarSourceFunction<T> sourceFunction = pulsarSourceFunctions.get(dataflowId);
+        if (sourceFunction != null) {
+            try {
+                sourceFunction.close();
+            } catch (Exception e) {
+                LOG.error("Could not properly shutdown the pulsar source function.", e);
+            }
+            pulsarSourceFunctions.remove(dataflowId);
+        }
+    }
+
+    public Collection<PulsarSourceFunction<T>> getCurrentSourceFunction() {
+        return pulsarSourceFunctions.values();
+    }
+
+    public class PulsarPullThread extends Thread {
+
+        private volatile boolean running = true;
+        private final PulsarSourceFunction<T> sourceFunction;
+
+        public PulsarPullThread(PulsarSourceFunction<T> sourceFunction) {
+            this.sourceFunction = checkNotNull(sourceFunction);
+        }
+
+        @Override
+        public void run() {
+            try {
+                sourceFunction.run(context);
+            } catch (InterruptedException e) {
+                // ignore interruption from cancelling
+                if (running) {
+                    exceptionCatcher.accept(e);
+                    LOG.warn("pulsar pull thread has been interrupted.", e);
+                }
+            } catch (Throwable t) {
+                exceptionCatcher.accept(t);
+                LOG.warn("Error occurred in pulsar pull thread.", t);
+            } finally {
+                LOG.info("pulsar pull thread stops");
+            }
+        }
+
+        public void cancel() {
+            this.running = false;
+            sourceFunction.cancel();
+        }
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunction.java
new file mode 100644
index 0000000..4def061
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunction.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.flink.SourceEvent;
+import org.apache.inlong.sort.flink.SourceEvent.SourceEventType;
+import org.apache.inlong.sort.flink.multitenant.MultiTenantFunctionInitializationContext;
+import org.apache.inlong.sort.flink.tubemq.MultiTopicTubeSourceFunction;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MultiTopicPulsarSourceFunction extends RichParallelSourceFunction<SerializedRecord> implements
+        CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(MultiTopicTubeSourceFunction.class);
+
+    private final Configuration configuration;
+    private volatile boolean running = true;
+    private volatile Throwable throwable;
+
+    private transient MetaManager metaManager;
+    private transient MultiTenancyPulsarConsumer<SerializedRecord> pulsarConsumer;
+    private transient FunctionInitializationContext functionInitializationContext;
+    private transient BlockingQueue<SourceEvent> eventQueue;
+
+    public MultiTopicPulsarSourceFunction(Configuration configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
+        super.open(parameters);
+        eventQueue = new ArrayBlockingQueue<>(configuration.getInteger(Constants.SOURCE_EVENT_QUEUE_CAPACITY));
+        this.metaManager = MetaManager.getInstance(configuration);
+        this.metaManager.registerDataFlowInfoListener(new PulsarDataFlowInfoListener());
+        this.pulsarConsumer = new MultiTenancyPulsarConsumer<>(throwable -> this.throwable = throwable);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+        this.functionInitializationContext = functionInitializationContext;
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+
+        Collection<PulsarSourceFunction<SerializedRecord>> currentSourceFunction = pulsarConsumer
+                .getCurrentSourceFunction();
+        for (PulsarSourceFunction<SerializedRecord> writer : currentSourceFunction) {
+            writer.snapshotState(functionSnapshotContext);
+        }
+    }
+
+    @Override
+    public void run(SourceContext<SerializedRecord> sourceContext) throws Exception {
+        pulsarConsumer.start(sourceContext);
+        while (running) {
+            if (throwable != null) {
+                throw new RuntimeException(throwable);
+            }
+            SourceEvent sourceEvent = eventQueue.poll(1000, TimeUnit.MILLISECONDS);
+            if (sourceEvent != null) {
+                processEvent(sourceEvent);
+            }
+            if (!running) {
+                return;
+            }
+        }
+    }
+
+    private void processEvent(SourceEvent sourceEvent) throws Exception {
+        SourceEventType sourceEventType = sourceEvent.getSourceEventType();
+        PulsarSourceInfo pulsarSourceInfo = (PulsarSourceInfo) sourceEvent.getSourceInfo();
+        Map<String, Object> properties = sourceEvent.getProperties();
+        long dataFlowId = sourceEvent.getDataFlowId();
+
+        switch (sourceEventType) {
+            case ADDED:
+                PulsarSourceFunction<SerializedRecord> pulsarSourceFunction = generateSourceFunction(dataFlowId,
+                        properties, pulsarSourceInfo);
+                pulsarConsumer.addPulsarSource(dataFlowId, pulsarSourceFunction);
+                break;
+            case UPDATE:
+                PulsarSourceFunction<SerializedRecord> updateSourceFunction = generateSourceFunction(dataFlowId,
+                        properties, pulsarSourceInfo);
+                pulsarConsumer.updatePulsarSource(dataFlowId, updateSourceFunction);
+                break;
+            case REMOVED:
+                pulsarConsumer.removePulsarSource(dataFlowId);
+                break;
+            default:
+                LOG.error("Unknown source event type {}", sourceEvent.getSourceEventType());
+                throw new RuntimeException("Unknown source event type " + sourceEvent.getSourceEventType());
+        }
+    }
+
+    public PulsarSourceFunction<SerializedRecord> generateSourceFunction(
+            long dataFlowId,
+            Map<String, Object> properties,
+            PulsarSourceInfo pulsarSourceInfo) throws Exception {
+        org.apache.flink.configuration.Configuration config =
+                new org.apache.flink.configuration.Configuration();
+        putMapToConfig(config, properties);
+        PulsarSourceFunction<SerializedRecord> pulsarSourceFunction = new PulsarSourceFunction<>(
+                pulsarSourceInfo.getAdminUrl(),
+                pulsarSourceInfo.getServiceUrl(),
+                pulsarSourceInfo.getTopic(),
+                pulsarSourceInfo.getSubscriptionName(),
+                new SerializedRecordDeserializationSchema(dataFlowId),
+                config);
+        pulsarSourceFunction.setRuntimeContext(getRuntimeContext());
+        pulsarSourceFunction.initializeState(
+                new MultiTenantFunctionInitializationContext(dataFlowId, functionInitializationContext,
+                        getRuntimeContext().getExecutionConfig()
+                ));
+        pulsarSourceFunction.open(config);
+        return pulsarSourceFunction;
+
+    }
+
+    public void putMapToConfig(org.apache.flink.configuration.Configuration config, Map<String, Object> properties) {
+        for (Map.Entry<String, Object> entry : properties.entrySet()) {
+            if (entry.getKey().startsWith(Constants.PULSAR_SOURCE_PREFIX)) {
+                config.setString(entry.getKey().replaceFirst(Constants.PULSAR_SOURCE_PREFIX, ""),
+                        entry.getValue().toString());
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        this.running = false;
+        pulsarConsumer.cancel();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        pulsarConsumer.close();
+        MetaManager.release();
+    }
+
+    private class PulsarDataFlowInfoListener implements MetaManager.DataFlowInfoListener {
+
+        public void queueEvent(SourceEventType eventType, DataFlowInfo dataFlowInfo) throws InterruptedException {
+            if (dataFlowInfo.getSourceInfo() instanceof PulsarSourceInfo) {
+                eventQueue.put(new SourceEvent(eventType, dataFlowInfo.getId(),
+                        (PulsarSourceInfo) dataFlowInfo.getSourceInfo(), dataFlowInfo.getProperties()));
+            } else {
+                LOG.warn("Received a non-pulsar data flow notification {}: {}", eventType, dataFlowInfo);
+            }
+        }
+
+        @Override
+        public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+            queueEvent(SourceEventType.ADDED, dataFlowInfo);
+        }
+
+
+        @Override
+        public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+            queueEvent(SourceEventType.UPDATE, dataFlowInfo);
+        }
+
+        @Override
+        public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+            queueEvent(SourceEventType.REMOVED, dataFlowInfo);
+        }
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarDeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarDeserializationSchema.java
new file mode 100644
index 0000000..5d03987
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.pulsar.client.api.Message;
+
+public interface PulsarDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+    class DeserializationResult<T> {
+        private final T record;
+        // the reason of including data length here is to reduce overhead of getting data from Message several times
+        private final long dataLength;
+
+        private DeserializationResult(T record, long length) {
+            this.record = record;
+            this.dataLength = length;
+        }
+
+        public T getRecord() {
+            return record;
+        }
+
+        public long getDataLength() {
+            return dataLength;
+        }
+
+        public static <T> DeserializationResult<T> of(T record, long dataLength) {
+            return new DeserializationResult<>(record, dataLength);
+        }
+    }
+
+    DeserializationResult<T> deserialize(@SuppressWarnings("rawtypes") Message message) throws IOException;
+
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarOptions.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarOptions.java
new file mode 100644
index 0000000..a209820
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarOptions.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * The configuration options for pulsar sources and sink.
+ */
+public class PulsarOptions {
+
+    public static final String BOOTSTRAP_MODE_LATEST = "latest";
+    public static final String BOOTSTRAP_MODE_EARLIEST = "earliest";
+
+    public static final ConfigOption<String> OPERATION_TIMEOUT =
+            ConfigOptions.key("operation-timeout")
+                    .defaultValue("30s")
+                    .withDescription("Duration of waiting for completing an operation.");
+
+    public static final ConfigOption<String> CONNECTION_TIMEOUT =
+            ConfigOptions.key("connection-timeout")
+                    .defaultValue("10s")
+                    .withDescription("Duration of waiting for a connection to a broker to be established.");
+
+    public static final ConfigOption<String> REQUEST_TIMEOUT =
+            ConfigOptions.key("request-timeout")
+                    .defaultValue("60s")
+                    .withDescription("Duration of waiting for completing a request.");
+
+    public static final ConfigOption<String> KEEPALIVE_INTERVAL =
+            ConfigOptions.key("keepalive-interval")
+                    .defaultValue("30s")
+                    .withDescription("Duration of keeping alive interval for each client broker connection.");
+
+    public static final ConfigOption<String> CONSUMER_BOOTSTRAP_MODE =
+            ConfigOptions.key("consumer.bootstrap-mode")
+                    .defaultValue(BOOTSTRAP_MODE_LATEST)
+                    .withDescription("The behavior when the consumer bootstraps. This only takes effect when not "
+                            + "restoring from checkpoints.");
+
+    public static final ConfigOption<Integer> CONSUMER_RECEIVE_QUEUE_SIZE =
+            ConfigOptions.key("consumer.receive-queue-size")
+                    .defaultValue(1000)
+                    .withDescription("The size of a consumer's receiver queue.");
+
+    public static final ConfigOption<String> CONSUMER_RECEIVE_TIMEOUT =
+            ConfigOptions.key("consumer.receive-timeout")
+                    .defaultValue("120s")
+                    .withDescription("The timeout in each receiving from pulsar.");
+
+    public static final ConfigOption<String> CONSUMER_CHECK_PARTITION_INTERVAL =
+            ConfigOptions.key("consumer.check-partition-interval")
+                    .defaultValue("5s")
+                    .withDescription("Duration of checking interval for each partition reader.");
+
+    public static final ConfigOption<String> CONSUMER_MAX_IDLE_TIME =
+            ConfigOptions.key("consumer.max-idle-time")
+                    .defaultValue("60s")
+                    .withDescription("The max idle time for the pulsar consumer.");
+
+    public static final ConfigOption<Boolean> CONSUMER_ENABLE_AUTO_COMMIT =
+            ConfigOptions.key("consumer.enable-auto-commit")
+                    .defaultValue(true)
+                    .withDescription("True if the consumer is enabled to automatically commit offsets.");
+
+    public static final ConfigOption<String> CONSUMER_AUTO_COMMIT_INTERVAL =
+            ConfigOptions.key("consumer.auto-commit-interval")
+                    .defaultValue("120s")
+                    .withDescription("The interval for consumers to commit offset if automatically commit is enabled.");
+
+    public static final ConfigOption<String> PRODUCER_ROUTE_MODE =
+            ConfigOptions.key("producer.route-mode")
+                    .defaultValue("RoundRobinPartition")
+                    .withDescription("Message routing logic for producers on partitioned topics.");
+
+    public static final ConfigOption<Integer> PRODUCER_PENDING_QUEUE_SIZE =
+            ConfigOptions.key("producer.pending-queue-size")
+                    .defaultValue(1000)
+                    .withDescription("The maximum size of a queue holding pending messages.");
+
+    public static final ConfigOption<Integer> PRODUCER_PENDING_SIZE =
+            ConfigOptions.key("producer.pending-total-size")
+                    .defaultValue(50000)
+                    .withDescription("The maximum number of pending messages across partitions.");
+
+    public static final ConfigOption<Boolean> PRODUCER_BLOCK_QUEUE_FULL =
+            ConfigOptions.key("producer.block-if-queue-full")
+                    .defaultValue(true)
+                    .withDescription("When the queue is full, the method is blocked "
+                            + "instead of an exception is thrown.");
+
+    public static final ConfigOption<String> PRODUCER_SEND_TIMEOUT =
+            ConfigOptions.key("producer.send-timeout")
+                    .defaultValue("30s")
+                    .withDescription("The timeout in each sending to pulsar.");
+
+    public static final ConfigOption<Boolean> SOURCE_DISABLE_CHAINING =
+            ConfigOptions.key("source.disable-chaining")
+                    .defaultValue(false)
+                    .withDescription("The source operator will not chain with downstream if true.");
+
+    public static final ConfigOption<Boolean> SINK_START_NEW_CHAIN =
+            ConfigOptions.key("sink.start-new-chain")
+                    .defaultValue(true)
+                    .withDescription("The sink operator will start a new chain if true.");
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarSourceFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarSourceFunction.java
new file mode 100644
index 0000000..daff1d1
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarSourceFunction.java
@@ -0,0 +1,580 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.flink.pulsar.PulsarDeserializationSchema.DeserializationResult;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Reader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pulsar source (consumer) which receives messages from topics.
+ *
+ * <p>
+ * Copied from Flink-connector with a bit of change. We need whole Message of Pulsar to deserializing. So we fork this
+ * pulsar source function until our requirements have been satisfied in Flink-connector.
+ * </p>
+ */
+public class PulsarSourceFunction<T>
+        extends RichParallelSourceFunction<T>
+        implements ResultTypeQueryable<T>, CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PulsarSourceFunction.class);
+
+    private static final String OFFSETS_STATE_NAME = "pulsar-offset-state";
+    private static final String READER_THREAD_NAME_PREFIX = "pulsar-reader-";
+    private static final String COMMITTER_THREAD_NAME = "pulsar-committer";
+    private static final String SUBSCRIPTION_NAME_OPTION_KEY = "subscriptionName";
+
+    /**
+     * The pulsar admin console address.
+     */
+    private final String adminUrl;
+
+    /**
+     * The pulsar service provider address.
+     */
+    private final String serviceUrl;
+
+    /**
+     * The pulsar topic name.
+     */
+    private final String topic;
+
+    /**
+     * The pulsar consumerGroup if assigned.
+     */
+    private final String consumerGroup;
+
+    /**
+     * The configuration for the pulsar consumer.
+     */
+    private final Configuration configuration;
+
+    /**
+     * The deserializer for records.
+     */
+    private final PulsarDeserializationSchema<T> deserializationSchema;
+
+    /**
+     * The pulsar client.
+     */
+    private transient PulsarClient client;
+
+    /**
+     * The pulsar admin.
+     */
+    private transient PulsarAdmin admin;
+
+    /**
+     * The readers for assigned partitions.
+     *
+     * <p>NOTE: The map can only be accessed in the task thread.</p>
+     */
+    private transient Map<String, PulsarPartitionReader> partitionReaders;
+
+    /**
+     * Accessor for state in the operator state backend.
+     */
+    private transient ListState<Tuple2<String, MessageId>> unionOffsetState;
+
+    /**
+     * The restored offsets of queues which are restored from flink state.
+     *
+     * <p>NOTE: The map can only be accessed in the task thread.</p>
+     */
+    private transient Map<String, MessageId> restoredOffsets;
+
+    /**
+     * The current offsets of queues which are stored in flink state
+     * once a checkpoint is triggered.
+     *
+     * <p>NOTE: The map should be guarded by the checkpoint lock.</p>
+     */
+    private transient Map<String, MessageId> currentOffsets;
+
+    /**
+     * Flag indicating whether the consumer is still running.
+     **/
+    private transient volatile boolean isRunning;
+
+    /**
+     * The timeout for the reader's pulling.
+     */
+    private transient Duration receiveTimeout;
+
+    /**
+     * The last committed offsets for topic partition.
+     */
+    private transient Map<String, MessageId> lastCommittedOffsets;
+
+    /**
+     * The scheduler for auto commit tasks.
+     */
+    private transient ScheduledExecutorService executor;
+
+    public PulsarSourceFunction(
+            String adminUrl,
+            String serviceUrl,
+            String topic,
+            String consumerGroup,
+            PulsarDeserializationSchema<T> deserializationSchema,
+            Configuration configuration
+    ) {
+        Preconditions.checkNotNull(topic,
+                "The topic must not be null.");
+        Preconditions.checkNotNull(consumerGroup,
+                "The consumer group must not be null.");
+        Preconditions.checkNotNull(serviceUrl,
+                "The service url must not be null.");
+        Preconditions.checkNotNull(adminUrl,
+                "The admin url must not be null.");
+        Preconditions.checkNotNull(deserializationSchema,
+                "The deserialization schema must not be null.");
+        Preconditions.checkNotNull(configuration,
+                "The configuration must not be null.");
+
+        this.adminUrl = adminUrl;
+        this.serviceUrl = serviceUrl;
+        this.topic = topic;
+        this.consumerGroup = consumerGroup;
+        this.configuration = configuration;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public TypeInformation<T> getProducedType() {
+        return deserializationSchema.getProducedType();
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+
+        super.open(parameters);
+
+        LOG.info("Opening the pulsar source: adminUrl: {}, serviceUrl: {}, topic: {}, consumer group: {}.",
+                adminUrl, serviceUrl, topic, consumerGroup);
+
+        LOG.info("Pulsar source configuration: {}", configuration);
+
+        admin = PulsarUtils.createAdmin(adminUrl);
+        client = PulsarUtils.createClient(serviceUrl, configuration);
+        partitionReaders = new HashMap<>();
+        currentOffsets = new HashMap<>();
+        lastCommittedOffsets = new HashMap<>();
+
+        receiveTimeout =
+                parseDuration(configuration.getString(
+                        PulsarOptions.CONSUMER_RECEIVE_TIMEOUT));
+
+        isRunning = true;
+    }
+
+    @Override
+    public void run(SourceContext<T> context) throws Exception {
+
+        int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+        int taskIndex = getRuntimeContext().getIndexOfThisSubtask();
+
+        boolean isAutoCommitEnabled =
+                configuration.getBoolean(PulsarOptions.CONSUMER_ENABLE_AUTO_COMMIT);
+        if (isAutoCommitEnabled) {
+            Duration autoCommitInterval =
+                    parseDuration(configuration.getString(
+                            PulsarOptions.CONSUMER_AUTO_COMMIT_INTERVAL));
+
+            ThreadFactory threadFactory =
+                    new ExecutorThreadFactory(COMMITTER_THREAD_NAME);
+            executor =
+                    Executors.newScheduledThreadPool(1, threadFactory);
+            executor.scheduleWithFixedDelay(
+                    new AutoCommitTask(context),
+                    autoCommitInterval.toMillis(),
+                    autoCommitInterval.toMillis(),
+                    TimeUnit.MILLISECONDS
+            );
+        }
+
+        Duration checkPartitionInterval =
+                parseDuration(configuration.getString(
+                        PulsarOptions.CONSUMER_CHECK_PARTITION_INTERVAL));
+        Duration maxIdleTime =
+                parseDuration(configuration.getString(
+                        PulsarOptions.CONSUMER_MAX_IDLE_TIME));
+
+        Instant lastEmitInstant = Instant.MIN;
+
+        // set subscriptionName
+        HashMap<String, Object> readerConf = new HashMap<>();
+        readerConf.putIfAbsent(SUBSCRIPTION_NAME_OPTION_KEY, consumerGroup);
+
+        while (isRunning) {
+
+            // Discover new partitions
+            Set<String> topicPartitions = PulsarUtils.getTopicPartitions(admin, topic);
+
+            Set<String> assignedTopicPartitions =
+                    topicPartitions.stream()
+                            .filter(p -> (((((long) p.hashCode()) & 0xffffffffL) % numTasks) == taskIndex))
+                            .collect(Collectors.toCollection(TreeSet::new));
+
+            for (String topicPartition : assignedTopicPartitions) {
+                if (!partitionReaders.containsKey(topicPartition)) {
+
+                    MessageId bootstrapOffset =
+                            getBootstrapOffset(topicPartition);
+
+                    LOG.info("Discover a new topic partition {} starting from offset {}.", topicPartition,
+                            bootstrapOffset);
+
+                    Reader<byte[]> reader =
+                            PulsarUtils.createReader(
+                                    client,
+                                    topicPartition,
+                                    bootstrapOffset,
+                                    configuration,
+                                    readerConf
+                            );
+
+                    PulsarPartitionReader partitionReader =
+                            new PulsarPartitionReader(
+                                    context,
+                                    topicPartition,
+                                    reader
+                            );
+                    partitionReaders.put(topicPartition, partitionReader);
+
+                    partitionReader.start();
+                }
+            }
+
+            // Check the status of each partition reader
+            for (PulsarPartitionReader partitionReader :
+                    partitionReaders.values()) {
+
+                Throwable partitionThrowable = partitionReader.getThrowable();
+                if (partitionThrowable != null) {
+                    throw new IOException("Could not properly read messages from topic partition "
+                            + partitionReader.getTopicPartition() + ".", partitionThrowable);
+                }
+
+                Instant partitionLastEmitInstant =
+                        partitionReader.getLastEmitInstant();
+                if (partitionLastEmitInstant.compareTo(lastEmitInstant) > 0) {
+                    lastEmitInstant = partitionLastEmitInstant;
+                }
+            }
+
+            // Mark the consumer as idle if the idle time exceeds the limit.
+            Duration idleTime =
+                    Duration.between(lastEmitInstant, Instant.now());
+            if (idleTime.compareTo(maxIdleTime) > 0) {
+                context.markAsTemporarilyIdle();
+            }
+
+            //noinspection BusyWait
+            Thread.sleep(checkPartitionInterval.toMillis());
+        }
+
+        // Waits until all readers exit
+        for (
+                PulsarPartitionReader partitionReader :
+                partitionReaders.values()
+        ) {
+            try {
+                partitionReader.join();
+                partitionReader.close();
+            } catch (Throwable t) {
+                LOG.warn("Could not properly close the reader for topic partition {}.",
+                        partitionReader.getTopicPartition(), t);
+            }
+        }
+    }
+
+    @Override
+    public void initializeState(
+            FunctionInitializationContext context
+    ) throws Exception {
+        OperatorStateStore stateStore = context.getOperatorStateStore();
+        unionOffsetState = stateStore.getUnionListState(
+                new ListStateDescriptor<>(
+                        OFFSETS_STATE_NAME,
+                        TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
+                        })
+                )
+        );
+
+        restoredOffsets = new TreeMap<>();
+        if (context.isRestored()) {
+            for (Tuple2<String, MessageId> restoredOffset :
+                    unionOffsetState.get()) {
+                String topicPartition = restoredOffset.f0;
+                MessageId offset = restoredOffset.f1;
+
+                restoredOffsets.put(topicPartition, offset);
+            }
+
+            LOG.info("The pulsar source successfully restores the offsets ({}).", restoredOffsets);
+        } else {
+            LOG.info("The pulsar source does not restore from any checkpoint.");
+        }
+    }
+
+    @Override
+    public void snapshotState(
+            FunctionSnapshotContext context
+    ) throws Exception {
+
+        unionOffsetState.clear();
+        for (Map.Entry<String, MessageId> entry : currentOffsets.entrySet()) {
+            unionOffsetState.add(Tuple2.of(entry.getKey(), entry.getValue()));
+        }
+
+        LOG.info("Successfully save the offsets {} in checkpoint {}.",
+                currentOffsets, context.getCheckpointId());
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+        cancel();
+
+        if (executor != null) {
+            try {
+                executor.shutdown();
+            } catch (Throwable t) {
+                LOG.warn("Could not properly shutdown the executor.", t);
+            }
+        }
+
+        if (client != null) {
+            try {
+                client.close();
+            } catch (Throwable t) {
+                LOG.warn("Could not properly close the pulsar client.", t);
+            }
+        }
+
+        if (admin != null) {
+            try {
+                admin.close();
+            } catch (Throwable t) {
+                LOG.warn("Could not properly close the pulsar admin.", t);
+            }
+        }
+
+        super.close();
+    }
+
+    private MessageId getBootstrapOffset(
+            String topicPartition
+    ) throws PulsarAdminException {
+        MessageId restoredOffset = restoredOffsets.get(topicPartition);
+        if (restoredOffset != null) {
+            return restoredOffset;
+        }
+
+        String bootstrapMode = configuration.getString(
+                PulsarOptions.CONSUMER_BOOTSTRAP_MODE);
+
+        switch (bootstrapMode) {
+            case PulsarOptions.BOOTSTRAP_MODE_EARLIEST:
+                return MessageId.earliest;
+            case PulsarOptions.BOOTSTRAP_MODE_LATEST:
+                return MessageId.latest;
+            default:
+                return PulsarUtils.getTopicPartitionOffset(admin, topicPartition, consumerGroup);
+        }
+    }
+
+    private class PulsarPartitionReader extends Thread implements AutoCloseable {
+
+        private final SourceContext<T> context;
+
+        private final String topicPartition;
+
+        private final Reader<byte[]> reader;
+
+        private volatile Throwable throwable;
+
+        private volatile Instant lastEmitInstant;
+
+        PulsarPartitionReader(
+                SourceContext<T> context,
+                String topicPartition,
+                Reader<byte[]> reader
+        ) {
+            super(READER_THREAD_NAME_PREFIX + topicPartition);
+
+            this.context = context;
+            this.topicPartition = topicPartition;
+            this.reader = reader;
+            this.throwable = null;
+            this.lastEmitInstant = Instant.MIN;
+        }
+
+        String getTopicPartition() {
+            return topicPartition;
+        }
+
+        Throwable getThrowable() {
+            return throwable;
+        }
+
+        Instant getLastEmitInstant() {
+            return lastEmitInstant;
+        }
+
+        @Override
+        public void run() {
+            try {
+                LOG.info("The reader for {} starts.", topicPartition);
+
+                while (isRunning) {
+                    //noinspection rawtypes
+                    Message message =
+                            reader.readNext(
+                                    (int) receiveTimeout.toMillis(),
+                                    TimeUnit.MILLISECONDS
+                            );
+
+                    if (message != null) {
+                        String topicPartition = message.getTopicName();
+                        MessageId offset = message.getMessageId();
+
+                        DeserializationResult<T> result = deserializationSchema.deserialize(message);
+
+                        synchronized (context.getCheckpointLock()) {
+                            context.collect(result.getRecord());
+                            currentOffsets.put(topicPartition, offset);
+                        }
+
+                        lastEmitInstant = Instant.now();
+                    } else {
+                        LOG.debug("The reader read is null, maybe it reached the end of the topic:{}", topicPartition);
+                    }
+                }
+            } catch (Throwable t) {
+                LOG.error("Error occurred of partition reader", t);
+                throwable = t;
+            } finally {
+                LOG.info("The reader for {} exits.", topicPartition);
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            reader.close();
+        }
+    }
+
+    private class AutoCommitTask implements Runnable {
+
+        private final SourceContext<T> context;
+
+        private AutoCommitTask(SourceContext<T> context) {
+            this.context = context;
+        }
+
+        @Override
+        public void run() {
+            try {
+
+                if (!isRunning) {
+                    return;
+                }
+
+                Map<String, MessageId> currentOffsetsCopy;
+                synchronized (context.getCheckpointLock()) {
+                    currentOffsetsCopy = new HashMap<>(currentOffsets);
+                }
+
+                for (
+                        Map.Entry<String, MessageId> entry :
+                        currentOffsetsCopy.entrySet()
+                ) {
+                    String topicPartition = entry.getKey();
+                    MessageId currentOffset =
+                            currentOffsetsCopy.get(topicPartition);
+
+                    MessageId lastCommittedOffset =
+                            lastCommittedOffsets.get(topicPartition);
+
+                    // Skips the committing if the offset is not changed.
+                    if (Objects.equals(currentOffset, lastCommittedOffset)) {
+                        continue;
+                    }
+
+                    PulsarUtils.commitTopicPartitionOffset(
+                            admin,
+                            topicPartition,
+                            currentOffset,
+                            consumerGroup
+                    );
+
+                    lastCommittedOffsets.put(topicPartition, currentOffset);
+                }
+            } catch (Throwable throwable) {
+                LOG.warn("Could not properly commit the offset.", throwable);
+            }
+        }
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarUtils.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarUtils.java
new file mode 100644
index 0000000..a7e453a
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/PulsarUtils.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static org.apache.flink.util.TimeUtils.parseDuration;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.CONNECTION_TIMEOUT;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.CONSUMER_RECEIVE_QUEUE_SIZE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.KEEPALIVE_INTERVAL;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.OPERATION_TIMEOUT;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_BLOCK_QUEUE_FULL;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_PENDING_QUEUE_SIZE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_PENDING_SIZE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_ROUTE_MODE;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.PRODUCER_SEND_TIMEOUT;
+import static org.apache.inlong.sort.flink.pulsar.PulsarOptions.REQUEST_TIMEOUT;
+import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
+import static org.apache.pulsar.common.naming.TopicName.getPartitionIndex;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The utility class for pulsar.
+ */
+public class PulsarUtils {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PulsarUtils.class);
+
+    public static PulsarClient createClient(
+            String serviceUrl,
+            Configuration configuration
+    ) throws PulsarClientException {
+
+        Duration operationTimeout =
+                parseDuration(configuration.getString(OPERATION_TIMEOUT));
+        Duration connectionTimeout =
+                parseDuration(configuration.getString(CONNECTION_TIMEOUT));
+        Duration requestTimeout =
+                parseDuration(configuration.getString(REQUEST_TIMEOUT));
+        Duration keepAliveInterval =
+                parseDuration(configuration.getString(KEEPALIVE_INTERVAL));
+
+        ClientConfigurationData clientConfigurationData =
+                new ClientConfigurationData();
+        clientConfigurationData
+                .setServiceUrl(serviceUrl);
+        clientConfigurationData
+                .setOperationTimeoutMs(operationTimeout.toMillis());
+        clientConfigurationData
+                .setConnectionTimeoutMs((int) connectionTimeout.toMillis());
+        clientConfigurationData
+                .setRequestTimeoutMs((int) requestTimeout.toMillis());
+        clientConfigurationData
+                .setKeepAliveIntervalSeconds((int) keepAliveInterval.getSeconds());
+
+        return new PulsarClientImpl(clientConfigurationData);
+    }
+
+    public static Reader<byte[]> createReader(
+            PulsarClient client,
+            String topicPartition,
+            MessageId bootstrapOffset,
+            Configuration configuration,
+            Map<String, Object> readerConf
+    ) throws PulsarClientException {
+
+        int receiveQueueSize =
+                configuration.getInteger(CONSUMER_RECEIVE_QUEUE_SIZE);
+
+        return client
+                .newReader()
+                .topic(topicPartition)
+                .startMessageId(bootstrapOffset)
+                .receiverQueueSize(receiveQueueSize)
+                .loadConf(readerConf)
+                .create();
+    }
+
+    public static Producer<byte[]> createProducer(
+            PulsarClient client,
+            String topic,
+            Configuration configuration
+    ) throws PulsarClientException {
+        MessageRoutingMode routeMode = Enum.valueOf(MessageRoutingMode.class,
+                PRODUCER_ROUTE_MODE.defaultValue());
+        Duration sendTimeout =
+                parseDuration(configuration.getString(PRODUCER_SEND_TIMEOUT));
+        int pendingQueueSize =
+                configuration.getInteger(PRODUCER_PENDING_QUEUE_SIZE);
+        int pendingSize =
+                configuration.getInteger(PRODUCER_PENDING_SIZE);
+        boolean blockIfQueueFull =
+                configuration.getBoolean(PRODUCER_BLOCK_QUEUE_FULL);
+
+        return client
+                .newProducer()
+                .topic(topic)
+                .messageRoutingMode(routeMode)
+                .sendTimeout((int) sendTimeout.toMillis(), TimeUnit.MILLISECONDS)
+                .maxPendingMessages(pendingQueueSize)
+                .maxPendingMessagesAcrossPartitions(pendingSize)
+                .blockIfQueueFull(blockIfQueueFull)
+                .create();
+    }
+
+    public static PulsarAdmin createAdmin(
+            String adminUrl
+    ) throws PulsarClientException {
+        return new PulsarAdmin(adminUrl, new ClientConfigurationData());
+    }
+
+    public static Set<String> getTopicPartitions(
+            PulsarAdmin admin,
+            String topic
+    ) throws PulsarAdminException {
+        Set<String> topicPartitions = new TreeSet<>();
+
+        int numTopicPartitions =
+                admin.topics().getPartitionedTopicMetadata(topic).partitions;
+        if (numTopicPartitions == 0) {
+            topicPartitions.add(topic);
+        } else {
+            for (int i = 0; i < numTopicPartitions; ++i) {
+                topicPartitions.add(topic + PARTITIONED_TOPIC_SUFFIX + i);
+            }
+        }
+
+        return topicPartitions;
+    }
+
+    public static MessageId getTopicPartitionOffset(
+            PulsarAdmin admin,
+            String topicPartition,
+            String consumerGroup
+    ) throws PulsarAdminException {
+        TopicStats topicStats = admin.topics().getStats(topicPartition);
+        if (topicStats.subscriptions.containsKey(consumerGroup)) {
+            SubscriptionStats subStats =
+                    topicStats.subscriptions.get(consumerGroup);
+            if (subStats.consumers.size() != 0) {
+                throw new RuntimeException("Subscription been actively used by other consumers in this situation, the "
+                        + "exactly-once semantics cannot be guaranteed.");
+            } else {
+                PersistentTopicInternalStats.CursorStats cursorStats =
+                        admin
+                                .topics()
+                                .getInternalStats(topicPartition)
+                                .cursors
+                                .get(consumerGroup);
+
+                String[] ids =
+                        cursorStats.markDeletePosition.split(":", 2);
+                long ledgerId = Long.parseLong(ids[0]);
+                long entryId = Long.parseLong(ids[1]);
+                int partitionIndex = getPartitionIndex(topicPartition);
+
+                return new MessageIdImpl(
+                        ledgerId,
+                        entryId + 1,
+                        partitionIndex
+                );
+            }
+        } else {
+            throw new RuntimeException("Could not find consumer group '" + consumerGroup + "' for topic partition "
+                    + topicPartition + ".");
+        }
+    }
+
+    public static void commitTopicPartitionOffset(
+            PulsarAdmin admin,
+            String topicPartition,
+            MessageId offset,
+            String consumerGroup
+    ) {
+        try {
+            admin.topics().resetCursor(topicPartition, consumerGroup, offset);
+        } catch (Throwable e) {
+            if (e instanceof PulsarAdminException
+                    && (((PulsarAdminException) e).getStatusCode() == 404
+                    || ((PulsarAdminException) e).getStatusCode() == 412)) {
+                LOG.info("Cannot commit cursor since the topic {} has been deleted during execution", topicPartition);
+            } else {
+                throw new RuntimeException(String.format("Failed to commit cursor for %s", topicPartition), e);
+            }
+        }
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/SerializedRecordDeserializationSchema.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/SerializedRecordDeserializationSchema.java
new file mode 100644
index 0000000..f1db602
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/flink/pulsar/SerializedRecordDeserializationSchema.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.io.IOException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.pulsar.client.api.Message;
+
+public class SerializedRecordDeserializationSchema implements PulsarDeserializationSchema<SerializedRecord> {
+
+    private static final long serialVersionUID = -374382043299290093L;
+
+    private final long dataFlowId;
+
+    public SerializedRecordDeserializationSchema(long dataFlowId) {
+        this.dataFlowId = dataFlowId;
+    }
+
+    @Override
+    public DeserializationResult<SerializedRecord> deserialize(@SuppressWarnings("rawtypes") Message message)
+            throws IOException {
+        final byte[] data = message.getData();
+        return DeserializationResult.of(new SerializedRecord(dataFlowId, message.getEventTime(), data), data.length);
+    }
+
+    @Override
+    public TypeInformation<SerializedRecord> getProducedType() {
+        return TypeInformation.of(SerializedRecord.class);
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunctionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunctionTest.java
new file mode 100644
index 0000000..c1cfb96
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/MultiTopicPulsarSourceFunctionTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.typeutils.runtime.RowSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.flink.SerializedRecord;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.util.CommonUtils;
+import org.apache.inlong.sort.util.TestMetaManagerUtil;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Ignore
+public class MultiTopicPulsarSourceFunctionTest {
+
+    public static final String TEST_TOPIC = "test_topic";
+    public static final String CONSUMER_GROUP = "test";
+    private static final int TOTAL_COUNT = 5;
+    private static final DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.2.0");
+    private PulsarContainer pulsar;
+    private TestMetaManagerUtil testMetaManagerUtil;
+    private StreamExecutionEnvironment env;
+
+    @Before
+    public void startPulsarContainer() throws Exception {
+        pulsar = new PulsarContainer(PULSAR_IMAGE);
+        pulsar.start();
+        producerMessageToPulsar(pulsar.getPulsarBrokerUrl());
+        testMetaManagerUtil = new PulsarTestMetaManagerUtil();
+        testMetaManagerUtil
+                .initDataFlowInfo(pulsar.getHttpServiceUrl(), pulsar.getPulsarBrokerUrl(), TEST_TOPIC, CONSUMER_GROUP);
+    }
+
+    @Test
+    public void testPulsarSourceFunction() throws Exception {
+        MultiTopicPulsarSourceFunction source = new MultiTopicPulsarSourceFunction(
+                testMetaManagerUtil.getConfig());
+        TestSourceContext<SerializedRecord> sourceContext = new TestSourceContext<>();
+        source.setRuntimeContext(
+                new MockStreamingRuntimeContext(false, 1, 0));
+
+        source.initializeState(
+                new MockFunctionInitializationContext(
+                        false,
+                        new MockOperatorStateStore(null, null)));
+        source.open(new Configuration());
+        final CheckedThread runThread =
+                new CheckedThread() {
+                    @Override
+                    public void go() throws Exception {
+                        source.run(sourceContext);
+                    }
+                };
+        runThread.start();
+        List<SerializedRecord> records = drain(sourceContext, 5);
+        assertEquals(5, records.size());
+        source.cancel();
+        source.close();
+        runThread.stop();
+    }
+
+    protected void producerMessageToPulsar(String pulsarBrokerUrl) throws Exception {
+        RowSerializer rowSerializer = CommonUtils.generateRowSerializer(new RowFormatInfo(
+                new String[] {"f1", "f2"},
+                new FormatInfo[] {StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE}));
+        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(1024);
+
+        try (
+                PulsarClient client = PulsarClient.builder()
+                        .serviceUrl(pulsarBrokerUrl)
+                        .build();
+                Producer<byte[]> producer = client.newProducer()
+                        .topic(TEST_TOPIC)
+                        .create()
+        ) {
+            for (int cnt = 0; cnt < TOTAL_COUNT; cnt++) {
+                Row row = Row.of(String.valueOf(cnt), String.valueOf(cnt));
+                rowSerializer.serialize(row, dataOutputSerializer);
+                ByteArrayOutputStream byt = new ByteArrayOutputStream();
+                ObjectOutputStream objectOutputStream = new ObjectOutputStream(byt);
+                objectOutputStream.writeObject(new SerializedRecord(1L, 0, dataOutputSerializer.getCopyOfBuffer()));
+                producer.send(byt.toByteArray());
+            }
+        }
+    }
+
+    @After
+    public void stopPulsarContainer() {
+        if (pulsar != null) {
+            pulsar.close();
+        }
+    }
+
+    private static <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount)
+            throws Exception {
+        List<T> allRecords = new ArrayList<>();
+        LinkedBlockingQueue<StreamRecord<T>> queue = sourceContext.getCollectedOutputs();
+        while (allRecords.size() < expectedRecordCount) {
+            StreamRecord<T> record = queue.poll(100, TimeUnit.SECONDS);
+            if (record != null) {
+                allRecords.add(record.getValue());
+            } else {
+                throw new RuntimeException(
+                        "Can't receive " + expectedRecordCount + " elements before timeout.");
+            }
+        }
+
+        return allRecords;
+    }
+
+    private static class MockOperatorStateStore implements OperatorStateStore {
+
+        private final ListState<?> restoredOffsetListState;
+        private final ListState<?> restoredHistoryListState;
+
+        private MockOperatorStateStore(
+                ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
+            this.restoredOffsetListState = restoredOffsetListState;
+            this.restoredHistoryListState = restoredHistoryListState;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor)
+                throws Exception {
+            return null;
+        }
+
+        @Override
+        public <K, V> BroadcastState<K, V> getBroadcastState(
+                MapStateDescriptor<K, V> stateDescriptor) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Set<String> getRegisteredStateNames() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Set<String> getRegisteredBroadcastStateNames() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
+            return null;
+        }
+
+        @Override
+        public <T extends Serializable> ListState<T> getSerializableListState(String s) throws Exception {
+            return null;
+        }
+    }
+
+    private static class MockFunctionInitializationContext
+            implements FunctionInitializationContext {
+
+        private final boolean isRestored;
+        private final OperatorStateStore operatorStateStore;
+
+        private MockFunctionInitializationContext(
+                boolean isRestored, OperatorStateStore operatorStateStore) {
+            this.isRestored = isRestored;
+            this.operatorStateStore = operatorStateStore;
+        }
+
+        @Override
+        public boolean isRestored() {
+            return isRestored;
+        }
+
+        @Override
+        public OperatorStateStore getOperatorStateStore() {
+            return operatorStateStore;
+        }
+
+        @Override
+        public KeyedStateStore getKeyedStateStore() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/PulsarTestMetaManagerUtil.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/PulsarTestMetaManagerUtil.java
new file mode 100644
index 0000000..7f41a1c
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/PulsarTestMetaManagerUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.sort.ZkTools;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.CsvDeserializationInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HivePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.TextFileFormat;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
+import org.apache.inlong.sort.util.TestMetaManagerUtil;
+
+public class PulsarTestMetaManagerUtil extends TestMetaManagerUtil {
+
+    public PulsarTestMetaManagerUtil() throws Exception {
+        super();
+    }
+
+    @Override
+    public void addOrUpdateDataFlowInfo(long dataFlowId, String... args) throws Exception {
+        ZkTools.addDataFlowToCluster(cluster, dataFlowId, ZOOKEEPER.getConnectString(), zkRoot);
+        ZkTools.updateDataFlowInfo(
+                prepareDataFlowInfo(dataFlowId, args[0], args[1], args[2], args[3]),
+                cluster,
+                dataFlowId,
+                ZOOKEEPER.getConnectString(),
+                zkRoot);
+    }
+
+    @Override
+    public void initDataFlowInfo(String... args) throws Exception {
+        addOrUpdateDataFlowInfo(1L, args[0], args[1], args[2], args[3]);
+    }
+
+    @Override
+    public DataFlowInfo prepareDataFlowInfo(long dataFlowId, String... args) {
+
+        FieldInfo[] pulsarFields = new FieldInfo[] {
+                new FieldInfo("f1", StringFormatInfo.INSTANCE),
+                new FieldInfo("f2", StringFormatInfo.INSTANCE)
+        };
+
+        Map<String, Object> config = new HashMap<>();
+        config.put("pulsar.source.consumer.bootstrap-mode", "earliest");
+
+        return new DataFlowInfo(
+                dataFlowId,
+                new PulsarSourceInfo(
+                        args[0],
+                        args[1],
+                        args[2],
+                        args[3],
+                        new CsvDeserializationInfo(','),
+                        pulsarFields),
+                new HiveSinkInfo(
+                        new FieldInfo[0],
+                        "testServerJdbcUrl",
+                        "testDatabaseName",
+                        "testTableName",
+                        "testUsername",
+                        "testPassword",
+                        "testDataPath",
+                        new HivePartitionInfo[0],
+                        new TextFileFormat(',')),
+                config
+        );
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/TestSourceContext.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/TestSourceContext.java
new file mode 100644
index 0000000..1d6ff06
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/flink/pulsar/TestSourceContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.flink.pulsar;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * A testable {@link SourceFunction.SourceContext}.
+ */
+public class TestSourceContext<T> implements SourceFunction.SourceContext<T> {
+
+    private final Object checkpointLock = new Object();
+
+    private LinkedBlockingQueue<StreamRecord<T>> collectedOutputs = new LinkedBlockingQueue<>();
+
+    @Override
+    public void collect(T element) {
+        this.collectedOutputs.add(new StreamRecord<>(element));
+    }
+
+    @Override
+    public void collectWithTimestamp(T element, long timestamp) {
+        this.collectedOutputs.offer(new StreamRecord<>(element, timestamp));
+    }
+
+    @Override
+    public void emitWatermark(Watermark mark) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void markAsTemporarilyIdle() {
+
+    }
+
+    @Override
+    public Object getCheckpointLock() {
+        return checkpointLock;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    public StreamRecord<T> removeLatestOutput() {
+        return collectedOutputs.poll();
+    }
+
+    public LinkedBlockingQueue<StreamRecord<T>> getCollectedOutputs() {
+        return collectedOutputs;
+    }
+}
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/util/TestMetaManagerUtil.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/util/TestMetaManagerUtil.java
new file mode 100644
index 0000000..9b4c386
--- /dev/null
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/util/TestMetaManagerUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.util;
+
+import org.apache.inlong.sort.ZkTools;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+
+public abstract class TestMetaManagerUtil {
+
+    protected static ZooKeeperTestEnvironment ZOOKEEPER;
+
+    protected final String cluster = "cluster";
+
+    protected final String zkRoot = "/test";
+
+    protected final Configuration config = new Configuration();
+
+    protected final MetaManager metaManager;
+
+    public TestMetaManagerUtil() throws Exception {
+        ZOOKEEPER = new ZooKeeperTestEnvironment(1);
+        config.setString(Constants.CLUSTER_ID, cluster);
+        config.setString(Constants.ZOOKEEPER_QUORUM, ZOOKEEPER.getConnectString());
+        config.setString(Constants.ZOOKEEPER_ROOT, zkRoot);
+        metaManager = MetaManager.getInstance(config);
+    }
+
+    public Configuration getConfig() {
+        return config;
+    }
+
+    public void deleteDataFlowInfo(long dataFlowId) throws Exception {
+        ZkTools.removeDataFlowFromCluster(cluster, dataFlowId, ZOOKEEPER.getConnectString(), zkRoot);
+    }
+
+    public abstract void addOrUpdateDataFlowInfo(long dataFlowId, String... args) throws Exception;
+
+    public abstract void initDataFlowInfo(String... args) throws Exception;
+
+    public abstract DataFlowInfo prepareDataFlowInfo(long dataFlowId, String... args);
+
+    public void cleanup() throws Exception {
+        MetaManager.release();
+        ZOOKEEPER.shutdown();
+    }
+
+}