You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/17 10:02:00 UTC

[incubator-seatunnel] branch api-draft updated: [api-draft][connector] apache pulsar source (#1984)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new a8d47c97f [api-draft][connector] apache pulsar source (#1984)
a8d47c97f is described below

commit a8d47c97f2fabb79e53d78eb351b8cb94f58b2ce
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Fri Jun 17 18:01:54 2022 +0800

    [api-draft][connector] apache pulsar source (#1984)
    
    * [api-draft][connector] apache pulsar source
    
    # Conflicts:
    #       seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
    
    * [api-draft][connector] fix type not found
    
    * [api-draft][connector] add pulsar dependencies & licenses
    
    # Conflicts:
    #       seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
    
    * [api-draft][connector] fix deserialization schema
    
    * [api-draft][connector] pulsar source boundedness
---
 .../apache/seatunnel/common/PropertiesUtil.java    |  37 ++
 .../seatunnel-connectors-seatunnel-dist/pom.xml    |   7 +-
 .../seatunnel-connectors-seatunnel/pom.xml         |   3 +-
 .../seatunnel-connector-seatunnel-pulsar/pom.xml   |  99 ++++
 .../seatunnel/pulsar/config/BasePulsarConfig.java  |  30 +-
 .../seatunnel/pulsar/config/PulsarAdminConfig.java |  76 +++
 .../pulsar/config/PulsarClientConfig.java          |  80 +++
 .../seatunnel/pulsar/config/PulsarConfigUtil.java  |  81 +++
 .../pulsar/config/PulsarConsumerConfig.java        |  60 ++
 .../seatunnel/pulsar/config/SourceProperties.java  | 119 ++++
 .../seatunnel/pulsar/source/PulsarSource.java      | 293 ++++++++++
 .../source/enumerator/PulsarSplitEnumerator.java   | 281 +++++++++
 .../enumerator/PulsarSplitEnumeratorState.java     |  36 ++
 .../cursor/start/MessageIdStartCursor.java         |  65 +++
 .../enumerator/cursor/start/StartCursor.java       |  78 +++
 .../cursor/start/SubscriptionStartCursor.java      |  63 ++
 .../cursor/start/TimestampStartCursor.java         |  40 ++
 .../cursor/stop/LatestMessageStopCursor.java       |  59 ++
 .../cursor/stop/MessageIdStopCursor.java           |  58 ++
 .../enumerator/cursor/stop/NeverStopCursor.java    |  49 ++
 .../source/enumerator/cursor/stop/StopCursor.java  |  63 ++
 .../cursor/stop/TimestampStopCursor.java           |  44 ++
 .../enumerator/discoverer/PulsarDiscoverer.java    |  48 ++
 .../enumerator/discoverer/TopicListDiscoverer.java |  63 ++
 .../discoverer/TopicPatternDiscoverer.java         |  98 ++++
 .../source/enumerator/topic/TopicPartition.java    |  91 +++
 .../seatunnel/pulsar/source/reader/Handover.java   |  88 +++
 .../pulsar/source/reader/PulsarSourceReader.java   | 221 +++++++
 .../source/reader/PulsarSplitReaderThread.java     | 143 +++++
 .../pulsar/source/reader/RecordWithSplitId.java    |  30 +-
 .../pulsar/source/split/PulsarPartitionSplit.java  |  94 +++
 seatunnel-dist/release-docs/LICENSE                |  12 +
 seatunnel-dist/release-docs/NOTICE                 |   8 +
 .../release-docs/licenses/LICENSE-bouncycastle.txt |   7 +
 .../release-docs/licenses/LICENSE-jaxrs-api.txt    | 637 +++++++++++++++++++++
 tools/dependencies/known-dependencies.txt          |  12 +
 36 files changed, 3237 insertions(+), 36 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
index e9052bfe6..7d49900e5 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
@@ -19,7 +19,11 @@ package org.apache.seatunnel.common;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.commons.lang3.StringUtils;
+
 import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 public final class PropertiesUtil {
 
@@ -39,4 +43,37 @@ public final class PropertiesUtil {
             }
         });
     }
+
+    public static <E extends Enum<E>> E getEnum(final Config conf, final String key, final Class<E> enumClass, final E defaultEnum) {
+        if (!conf.hasPath(key)) {
+            return defaultEnum;
+        }
+        final String value = conf.getString(key);
+        if (StringUtils.isBlank(value)) {
+            return defaultEnum;
+        }
+        return Enum.valueOf(enumClass, value.toUpperCase());
+    }
+
+    public static <T> void setOption(Config config, String optionName, T defaultValue, Function<String, T> getter, Consumer<T> setter) {
+        T value;
+        if (config.hasPath(optionName)) {
+            value = getter.apply(optionName);
+        } else {
+            value = defaultValue;
+        }
+        if (value != null) {
+            setter.accept(value);
+        }
+    }
+
+    public static <T> void setOption(Config config, String optionName, Function<String, T> getter, Consumer<T> setter) {
+        T value = null;
+        if (config.hasPath(optionName)) {
+            value = getter.apply(optionName);
+        }
+        if (value != null) {
+            setter.accept(value);
+        }
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
index 46444870f..5c777fcee 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel-dist/pom.xml
@@ -70,6 +70,11 @@
             <artifactId>seatunnel-connector-seatunnel-clickhouse</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connector-seatunnel-pulsar</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -94,4 +99,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
index e7ae04ab2..66efaecc9 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/pom.xml
@@ -39,5 +39,6 @@
         <module>seatunnel-connector-seatunnel-jdbc</module>
         <module>seatunnel-connector-seatunnel-socket</module>
         <module>seatunnel-connector-seatunnel-clickhouse</module>
+        <module>seatunnel-connector-seatunnel-pulsar</module>
     </modules>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/pom.xml b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/pom.xml
new file mode 100644
index 000000000..51db4baf3
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>seatunnel-connectors-seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-connector-seatunnel-pulsar</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- Pulsar testing environment -->
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Pulsar bundles the latest bookkeeper & zookeeper, -->
+        <!-- we don't override the version here. -->
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>testmocks</artifactId>
+            <version>${pulsar.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.testng</groupId>
+                    <artifactId>testng</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.powermock</groupId>
+                    <artifactId>powermock-module-testng</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-broker</artifactId>
+            <version>${pulsar.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- Pulsar use a newer commons-lang3 in broker. -->
+        <!-- Bump the version only for testing. -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Add Pulsar 2.x as a dependency. -->
+        <!-- Move this to button for avoiding class conflicts with pulsar-broker. -->
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/BasePulsarConfig.java
similarity index 53%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/BasePulsarConfig.java
index e9052bfe6..ec6f2fb7b 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/BasePulsarConfig.java
@@ -15,28 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common;
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import java.io.Serializable;
 
-import java.util.Properties;
+public abstract class BasePulsarConfig implements Serializable {
+    private final String authPluginClassName;
+    private final String authParams;
 
-public final class PropertiesUtil {
+    public BasePulsarConfig(String authPluginClassName, String authParams) {
+        this.authPluginClassName = authPluginClassName;
+        this.authParams = authParams;
+    }
 
-    private PropertiesUtil() {
+    public String getAuthPluginClassName() {
+        return authPluginClassName;
     }
 
-    public static void setProperties(Config config, Properties properties, String prefix, boolean keepPrefix) {
-        config.entrySet().forEach(entry -> {
-            String key = entry.getKey();
-            Object value = entry.getValue().unwrapped();
-            if (key.startsWith(prefix)) {
-                if (keepPrefix) {
-                    properties.put(key, value);
-                } else {
-                    properties.put(key.substring(prefix.length()), value);
-                }
-            }
-        });
+    public String getAuthParams() {
+        return authParams;
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarAdminConfig.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarAdminConfig.java
new file mode 100644
index 000000000..119e97175
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarAdminConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+
+import org.apache.pulsar.shade.com.google.common.base.Preconditions;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+// TODO: more field
+
+public class PulsarAdminConfig extends BasePulsarConfig {
+    private static final long serialVersionUID = 1L;
+    private final String adminUrl;
+
+    private PulsarAdminConfig(String authPluginClassName, String authParams, String adminUrl) {
+        super(authPluginClassName, authParams);
+        this.adminUrl = adminUrl;
+    }
+
+    public String getAdminUrl() {
+        return adminUrl;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        /**
+         * Name of the authentication plugin.
+         */
+        private String authPluginClassName = "";
+        /**
+         * Parameters for the authentication plugin.
+         */
+        private String authParams = "";
+        private String adminUrl;
+
+        private Builder() {
+        }
+
+        public Builder authPluginClassName(String authPluginClassName) {
+            this.authPluginClassName = authPluginClassName;
+            return this;
+        }
+
+        public Builder authParams(String authParams) {
+            this.authParams = authParams;
+            return this;
+        }
+
+        public Builder adminUrl(String adminUrl) {
+            this.adminUrl = adminUrl;
+            return this;
+        }
+
+        public PulsarAdminConfig build() {
+            Preconditions.checkArgument(StringUtils.isNotBlank(adminUrl), "Pulsar admin URL is required.");
+            return new PulsarAdminConfig(authPluginClassName, authParams, adminUrl);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarClientConfig.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarClientConfig.java
new file mode 100644
index 000000000..cd1209151
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarClientConfig.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+
+import org.apache.pulsar.shade.com.google.common.base.Preconditions;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+// TODO: more field
+
+public class PulsarClientConfig extends BasePulsarConfig {
+    private static final long serialVersionUID = 1L;
+
+    private final String serviceUrl;
+
+    private PulsarClientConfig(String authPluginClassName, String authParams, String serviceUrl) {
+        super(authPluginClassName, authParams);
+        this.serviceUrl = serviceUrl;
+    }
+
+    public String getServiceUrl() {
+        return serviceUrl;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        /**
+         * Name of the authentication plugin.
+         */
+        private String authPluginClassName = "";
+        /**
+         * Parameters for the authentication plugin.
+         */
+        private String authParams = "";
+        /**
+         * Service URL provider for Pulsar service.
+         */
+        private String serviceUrl;
+
+        private Builder() {
+        }
+
+        public Builder authPluginClassName(String authPluginClassName) {
+            this.authPluginClassName = authPluginClassName;
+            return this;
+        }
+
+        public Builder authParams(String authParams) {
+            this.authParams = authParams;
+            return this;
+        }
+
+        public Builder serviceUrl(String serviceUrl) {
+            this.serviceUrl = serviceUrl;
+            return this;
+        }
+
+        public PulsarClientConfig build() {
+            Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "Pulsar service URL is required.");
+            return new PulsarClientConfig(authPluginClassName, authParams, serviceUrl);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
new file mode 100644
index 000000000..2d81ae494
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConfigUtil.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+public class PulsarConfigUtil {
+
+    private PulsarConfigUtil() {
+    }
+
+    public static PulsarAdmin createAdmin(PulsarAdminConfig config) {
+        PulsarAdminBuilder builder = PulsarAdmin.builder();
+        builder.serviceHttpUrl(config.getAdminUrl());
+        builder.authentication(createAuthentication(config));
+        try {
+            return builder.build();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static PulsarClient createClient(PulsarClientConfig config) {
+        ClientBuilder builder = PulsarClient.builder();
+        builder.serviceUrl(config.getServiceUrl());
+        builder.authentication(createAuthentication(config));
+        try {
+            return builder.build();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static ConsumerBuilder<byte[]> createConsumerBuilder(
+        PulsarClient client, PulsarConsumerConfig config) {
+        ConsumerBuilder<byte[]> builder = client.newConsumer(Schema.BYTES);
+        builder.subscriptionName(config.getSubscriptionName());
+        return builder;
+    }
+
+    private static Authentication createAuthentication(BasePulsarConfig config) {
+        if (StringUtils.isBlank(config.getAuthPluginClassName())) {
+            return AuthenticationDisabled.INSTANCE;
+        }
+
+        if (StringUtils.isNotBlank(config.getAuthPluginClassName())) {
+            try {
+                return AuthenticationFactory.create(config.getAuthPluginClassName(), config.getAuthParams());
+            } catch (PulsarClientException.UnsupportedAuthenticationException e) {
+                throw new RuntimeException("Failed to create the authentication plug-in.", e);
+            }
+        } else {
+            throw new IllegalArgumentException("Authentication parameters are required when using authentication plug-in.");
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConsumerConfig.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConsumerConfig.java
new file mode 100644
index 000000000..e3024c61a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/PulsarConsumerConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+
+// TODO: more field
+
+import org.apache.pulsar.shade.com.google.common.base.Preconditions;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+import java.io.Serializable;
+
+public class PulsarConsumerConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String subscriptionName;
+
+    private PulsarConsumerConfig(String subscriptionName) {
+        this.subscriptionName = subscriptionName;
+    }
+
+    public String getSubscriptionName() {
+        return subscriptionName;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        private String subscriptionName;
+
+        private Builder() {
+        }
+
+        public Builder subscriptionName(String subscriptionName) {
+            this.subscriptionName = subscriptionName;
+            return this;
+        }
+
+        public PulsarConsumerConfig build() {
+            Preconditions.checkArgument(StringUtils.isNotBlank(subscriptionName), "Pulsar subscription name is required.");
+            return new PulsarConsumerConfig(subscriptionName);
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
new file mode 100644
index 000000000..e86923220
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
+
+public class SourceProperties {
+
+    // Pulsar client API config prefix.
+    public static final String CLIENT_CONFIG_PREFIX = "pulsar.client.";
+    // Pulsar admin API config prefix.
+    public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin.";
+
+    // --------------------------------------------------------------------------------------------
+    // The configuration for ClientConfigurationData part.
+    // All the configuration listed below should have the pulsar.client prefix.
+    // --------------------------------------------------------------------------------------------
+
+    public static final String PULSAR_SERVICE_URL = CLIENT_CONFIG_PREFIX + "serviceUrl";
+    public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = CLIENT_CONFIG_PREFIX + "authPluginClassName";
+    public static final String PULSAR_AUTH_PARAMS = CLIENT_CONFIG_PREFIX + "authParams";
+
+    // --------------------------------------------------------------------------------------------
+    // The configuration for ClientConfigurationData part.
+    // All the configuration listed below should have the pulsar.client prefix.
+    // --------------------------------------------------------------------------------------------
+
+    public static final String PULSAR_ADMIN_URL = ADMIN_CONFIG_PREFIX + "adminUrl";
+
+    // Pulsar source connector config prefix.
+    public static final String SOURCE_CONFIG_PREFIX = "pulsar.source.";
+    // Pulsar consumer API config prefix.
+    public static final String CONSUMER_CONFIG_PREFIX = "pulsar.consumer.";
+
+    // --------------------------------------------------------------------------------------------
+    // The configuration for ConsumerConfigurationData part.
+    // All the configuration listed below should have the pulsar.consumer prefix.
+    // --------------------------------------------------------------------------------------------
+
+    public static final String PULSAR_SUBSCRIPTION_NAME = CONSUMER_CONFIG_PREFIX + "subscriptionName";
+    public static final String PULSAR_SUBSCRIPTION_TYPE = CONSUMER_CONFIG_PREFIX + "subscriptionType";
+    public static final String PULSAR_SUBSCRIPTION_MODE = CONSUMER_CONFIG_PREFIX + "subscriptionMode";
+
+    // --------------------------------------------------------------------------------------------
+    // The configuration for pulsar source part.
+    // All the configuration listed below should have the pulsar.source prefix.
+    // --------------------------------------------------------------------------------------------
+
+    public static final String PULSAR_PARTITION_DISCOVERY_INTERVAL_MS = SOURCE_CONFIG_PREFIX + "partitionDiscoveryIntervalMs";
+    public static final String PULSAR_TOPIC = SOURCE_CONFIG_PREFIX + "topic";
+    public static final String PULSAR_TOPIC_PATTERN = SOURCE_CONFIG_PREFIX + "topic.pattern";
+    public static final String PULSAR_POLL_TIMEOUT = SOURCE_CONFIG_PREFIX + "poll.timeout";
+    public static final String PULSAR_POLL_INTERVAL = SOURCE_CONFIG_PREFIX + "poll.interval";
+    public static final String PULSAR_BATCH_SIZE = SOURCE_CONFIG_PREFIX + "batch.size";
+    public static final String PULSAR_CURSOR_START_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.mode";
+    public static final String PULSAR_CURSOR_START_RESET_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.reset.mode";
+    public static final String PULSAR_CURSOR_START_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.start.timestamp";
+    public static final String PULSAR_CURSOR_START_ID = SOURCE_CONFIG_PREFIX + "scan.cursor.start.id";
+    public static final String PULSAR_CURSOR_STOP_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.mode";
+    public static final String PULSAR_CURSOR_STOP_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.timestamp";
+
+    /**
+     * Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
+     */
+    public enum StartMode {
+        /**
+         * Start from the earliest cursor possible.
+         */
+        EARLIEST,
+        /**
+         * "Start from the latest cursor."
+         */
+        LATEST,
+        /**
+         * Start from committed cursors in a specific consumer subscription.
+         */
+        SUBSCRIPTION,
+        /**
+         * Start from user-supplied timestamp for each partition.
+         */
+        TIMESTAMP,
+        /**
+         * Start from user-supplied specific cursors for each partition.
+         */
+        SPECIFIC;
+    }
+
+    /**
+     * Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
+     */
+    public enum StopMode {
+        /**
+         * "Start from the latest cursor."
+         */
+        LATEST,
+        /**
+         * Start from user-supplied timestamp for each partition.
+         */
+        TIMESTAMP,
+        /**
+         * Start from user-supplied specific cursors for each partition.
+         */
+        SPECIFIC,
+        NEVER;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
new file mode 100644
index 000000000..6e2b724de
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
+
+import static org.apache.seatunnel.common.PropertiesUtil.getEnum;
+import static org.apache.seatunnel.common.PropertiesUtil.setOption;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_ADMIN_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_AUTH_PLUGIN_CLASS_NAME;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_BATCH_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_RESET_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_TIMESTAMP;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_STOP_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_STOP_TIMESTAMP;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_POLL_INTERVAL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_POLL_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_SUBSCRIPTION_NAME;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_TOPIC_PATTERN;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode.LATEST;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelContext;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicListDiscoverer;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+@AutoService(SeaTunnelSource.class)
+public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit, PulsarSplitEnumeratorState> {
+    public static final String IDENTIFIER = "pulsar";
+    private DeserializationSchema<T> deserialization;
+    private SeaTunnelContext seaTunnelContext;
+
+    private PulsarAdminConfig adminConfig;
+    private PulsarClientConfig clientConfig;
+    private PulsarConsumerConfig consumerConfig;
+    private PulsarDiscoverer partitionDiscoverer;
+    private long partitionDiscoveryIntervalMs;
+    private StartCursor startCursor;
+    private StopCursor stopCursor;
+
+    protected int pollTimeout;
+    protected long pollInterval;
+    protected int batchSize;
+
+    @Override
+    public String getPluginName() {
+        return IDENTIFIER;
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(config, PULSAR_SUBSCRIPTION_NAME, PULSAR_SERVICE_URL, PULSAR_ADMIN_URL);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+        }
+
+        // admin config
+        PulsarAdminConfig.Builder adminConfigBuilder = PulsarAdminConfig.builder()
+            .adminUrl(config.getString(PULSAR_ADMIN_URL));
+        setOption(config, PULSAR_AUTH_PLUGIN_CLASS_NAME, config::getString, adminConfigBuilder::authPluginClassName);
+        setOption(config, PULSAR_AUTH_PARAMS, config::getString, adminConfigBuilder::authParams);
+        this.adminConfig = adminConfigBuilder.build();
+
+        // client config
+        PulsarClientConfig.Builder clientConfigBuilder = PulsarClientConfig.builder()
+            .serviceUrl(config.getString(PULSAR_SERVICE_URL));
+        setOption(config, PULSAR_AUTH_PLUGIN_CLASS_NAME, config::getString, clientConfigBuilder::authPluginClassName);
+        setOption(config, PULSAR_AUTH_PARAMS, config::getString, clientConfigBuilder::authParams);
+        this.clientConfig = clientConfigBuilder.build();
+
+        // consumer config
+        PulsarConsumerConfig.Builder consumerConfigBuilder = PulsarConsumerConfig.builder()
+            .subscriptionName(config.getString(PULSAR_SERVICE_URL));
+        this.consumerConfig = consumerConfigBuilder.build();
+
+        // source properties
+        setOption(config,
+            PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
+            30000L,
+            config::getLong,
+            v -> this.partitionDiscoveryIntervalMs = v);
+        setOption(config,
+            PULSAR_POLL_TIMEOUT,
+            100,
+            config::getInt,
+            v -> this.pollTimeout = v);
+        setOption(config,
+            PULSAR_POLL_INTERVAL,
+            50L,
+            config::getLong,
+            v -> this.pollInterval = v);
+        setOption(config,
+            PULSAR_BATCH_SIZE,
+            500,
+            config::getInt,
+            v -> this.batchSize = v);
+
+        setStartCursor(config);
+        setStopCursor(config);
+        setPartitionDiscoverer(config);
+        setDeserialization(config);
+
+        if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
+            && partitionDiscoveryIntervalMs > 0
+            && Boundedness.BOUNDED == stopCursor.getBoundedness()) {
+            throw new IllegalArgumentException("Bounded streams do not support dynamic partition discovery.");
+        }
+    }
+
+    private void setStartCursor(Config config) {
+        StartMode startMode = getEnum(config, PULSAR_CURSOR_START_MODE, StartMode.class, LATEST);
+        switch (startMode) {
+            case EARLIEST:
+                this.startCursor = StartCursor.earliest();
+                break;
+            case LATEST:
+                this.startCursor = StartCursor.latest();
+                break;
+            case SUBSCRIPTION:
+                SubscriptionStartCursor.CursorResetStrategy resetStrategy = getEnum(config,
+                    PULSAR_CURSOR_START_RESET_MODE,
+                    SubscriptionStartCursor.CursorResetStrategy.class,
+                    SubscriptionStartCursor.CursorResetStrategy.LATEST);
+                this.startCursor = StartCursor.subscription(resetStrategy);
+                break;
+            case TIMESTAMP:
+                if (StringUtils.isBlank(config.getString(PULSAR_CURSOR_START_TIMESTAMP))) {
+                    throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", PULSAR_CURSOR_START_TIMESTAMP, PULSAR_CURSOR_START_MODE));
+                }
+                setOption(config, PULSAR_CURSOR_START_TIMESTAMP, config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp));
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("The %s mode is not supported.", startMode));
+        }
+    }
+
+    private void setStopCursor(Config config) {
+        SourceProperties.StopMode stopMode = getEnum(config, PULSAR_CURSOR_STOP_MODE, SourceProperties.StopMode.class, NEVER);
+        switch (stopMode) {
+            case LATEST:
+                this.stopCursor = StopCursor.latest();
+                break;
+            case NEVER:
+                this.stopCursor = StopCursor.never();
+                break;
+            case TIMESTAMP:
+                if (StringUtils.isBlank(config.getString(PULSAR_CURSOR_STOP_TIMESTAMP))) {
+                    throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", PULSAR_CURSOR_STOP_TIMESTAMP, PULSAR_CURSOR_STOP_MODE));
+                }
+                setOption(config, PULSAR_CURSOR_START_TIMESTAMP, config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp));
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("The %s mode is not supported.", stopMode));
+        }
+    }
+
+    private void setPartitionDiscoverer(Config config) {
+        String topic = config.getString(PULSAR_TOPIC);
+        if (StringUtils.isNotBlank(topic)) {
+            this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ",")));
+        }
+        String topicPattern = config.getString(PULSAR_TOPIC_PATTERN);
+        if (StringUtils.isNotBlank(topicPattern)) {
+            if (this.partitionDiscoverer != null) {
+                throw new IllegalArgumentException(String.format("The properties '%s' and '%s' is exclusive.", PULSAR_TOPIC, PULSAR_TOPIC_PATTERN));
+            }
+            this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern));
+        }
+        if (this.partitionDiscoverer == null) {
+            throw new IllegalArgumentException(String.format("The properties '%s' or '%s' is required.", PULSAR_TOPIC, PULSAR_TOPIC_PATTERN));
+        }
+    }
+
+    private void setDeserialization(Config config) {
+        String format = config.getString("format");
+        // TODO: json format
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SeaTunnelContext getSeaTunnelContext() {
+        return this.seaTunnelContext;
+    }
+
+    @Override
+    public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
+        this.seaTunnelContext = seaTunnelContext;
+    }
+
+    @Override
+    public SeaTunnelDataType<T> getProducedType() {
+        return deserialization.getProducedType();
+    }
+
+    @Override
+    public SourceReader<T, PulsarPartitionSplit> createReader(SourceReader.Context readerContext) throws Exception {
+        return new PulsarSourceReader<>(readerContext,
+            clientConfig,
+            consumerConfig,
+            startCursor,
+            deserialization,
+            pollTimeout,
+            pollInterval,
+            batchSize);
+    }
+
+    @Override
+    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> createEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext) throws Exception {
+        return new PulsarSplitEnumerator(
+            enumeratorContext,
+            adminConfig,
+            partitionDiscoverer,
+            partitionDiscoveryIntervalMs,
+            startCursor,
+            stopCursor,
+            consumerConfig.getSubscriptionName());
+    }
+
+    @Override
+    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> restoreEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext, PulsarSplitEnumeratorState checkpointState) throws Exception {
+        return new PulsarSplitEnumerator(
+            enumeratorContext,
+            adminConfig,
+            partitionDiscoverer,
+            partitionDiscoveryIntervalMs,
+            startCursor,
+            stopCursor,
+            consumerConfig.getSubscriptionName(),
+            checkpointState.assignedPartitions());
+    }
+
+    @Override
+    public Serializer<PulsarSplitEnumeratorState> getEnumeratorStateSerializer() {
+        return new DefaultSerializer<>();
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
new file mode 100644
index 000000000..05ff4c236
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.SubscriptionStartCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.LatestMessageStopCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class PulsarSplitEnumerator implements SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarSplitEnumerator.class);
+
+    private final SourceSplitEnumerator.Context<PulsarPartitionSplit> context;
+    private final PulsarAdminConfig adminConfig;
+    private final PulsarDiscoverer partitionDiscoverer;
+    private final long partitionDiscoveryIntervalMs;
+    private final StartCursor startCursor;
+    private final StopCursor stopCursor;
+
+    /**
+     * The consumer group id used for this PulsarSource.
+     */
+    private final String subscriptionName;
+
+    /**
+     * Partitions that have been assigned to readers.
+     */
+    private final Set<TopicPartition> assignedPartitions;
+    /**
+     * The discovered and initialized partition splits that are waiting for owner reader to be
+     * ready.
+     */
+    private final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits;
+
+    private PulsarAdmin pulsarAdmin;
+
+    // This flag will be marked as true if periodically partition discovery is disabled AND the
+    // initializing partition discovery has finished.
+    private boolean noMoreNewPartitionSplits = false;
+
+    public PulsarSplitEnumerator(
+        SourceSplitEnumerator.Context<PulsarPartitionSplit> context,
+        PulsarAdminConfig adminConfig,
+        PulsarDiscoverer partitionDiscoverer,
+        long partitionDiscoveryIntervalMs,
+        StartCursor startCursor,
+        StopCursor stopCursor,
+        String subscriptionName) {
+        this(
+            context,
+            adminConfig,
+            partitionDiscoverer,
+            partitionDiscoveryIntervalMs,
+            startCursor,
+            stopCursor,
+            subscriptionName,
+            Collections.emptySet());
+    }
+
+    public PulsarSplitEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> context,
+                                 PulsarAdminConfig adminConfig,
+                                 PulsarDiscoverer partitionDiscoverer,
+                                 long partitionDiscoveryIntervalMs,
+                                 StartCursor startCursor,
+                                 StopCursor stopCursor,
+                                 String subscriptionName,
+                                 Set<TopicPartition> assignedPartitions) {
+        if ((partitionDiscoverer instanceof TopicPatternDiscoverer)
+            && partitionDiscoveryIntervalMs > 0
+            && Boundedness.BOUNDED == stopCursor.getBoundedness()) {
+            throw new IllegalArgumentException("Bounded streams do not support dynamic partition discovery.");
+        }
+        this.context = context;
+        this.adminConfig = adminConfig;
+        this.partitionDiscoverer = partitionDiscoverer;
+        this.partitionDiscoveryIntervalMs = partitionDiscoveryIntervalMs;
+        this.startCursor = startCursor;
+        this.stopCursor = stopCursor;
+        this.subscriptionName = subscriptionName;
+        this.assignedPartitions = new HashSet<>(assignedPartitions);
+        this.pendingPartitionSplits = new HashMap<>();
+    }
+
+    @Override
+    public void open() {
+        this.pulsarAdmin = PulsarConfigUtil.createAdmin(adminConfig);
+    }
+
+    @Override
+    public void run() throws Exception {
+        Set<TopicPartition> subscribedTopicPartitions = partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin);
+        checkPartitionChanges(subscribedTopicPartitions);
+    }
+
+    private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions) {
+        // Append the partitions into current assignment state.
+        final Set<TopicPartition> newPartitions = getNewPartitions(fetchedPartitions);
+        if (newPartitions.isEmpty()) {
+            return;
+        }
+        if (partitionDiscoveryIntervalMs < 0 && !noMoreNewPartitionSplits) {
+            LOG.debug("Partition discovery is disabled.");
+            noMoreNewPartitionSplits = true;
+        }
+        List<PulsarPartitionSplit> newSplits = newPartitions.stream()
+            .map(this::createPulsarPartitionSplit)
+            .collect(Collectors.toList());
+        addPartitionSplitChangeToPendingAssignments(newSplits);
+        assignPendingPartitionSplits(context.registeredReaders());
+    }
+
+    private PulsarPartitionSplit createPulsarPartitionSplit(TopicPartition partition) {
+        StopCursor partitionStopCursor = stopCursor.copy();
+        PulsarPartitionSplit split = new PulsarPartitionSplit(partition, partitionStopCursor);
+        if (partitionStopCursor instanceof LatestMessageStopCursor) {
+            ((LatestMessageStopCursor) partitionStopCursor).prepare(pulsarAdmin, partition);
+        }
+        if (startCursor instanceof SubscriptionStartCursor) {
+            ((SubscriptionStartCursor) startCursor).ensureSubscription(subscriptionName, partition, pulsarAdmin);
+        }
+        return split;
+    }
+
+    private Set<TopicPartition> getNewPartitions(Set<TopicPartition> fetchedPartitions) {
+        Consumer<TopicPartition> dedupOrMarkAsRemoved = fetchedPartitions::remove;
+        assignedPartitions.forEach(dedupOrMarkAsRemoved);
+        pendingPartitionSplits.forEach(
+            (reader, splits) ->
+                splits.forEach(
+                    split -> dedupOrMarkAsRemoved.accept(split.getPartition())));
+
+        if (!fetchedPartitions.isEmpty()) {
+            LOG.info("Discovered new partitions: {}", fetchedPartitions);
+        }
+
+        return fetchedPartitions;
+    }
+
+    private void addPartitionSplitChangeToPendingAssignments(
+        Collection<PulsarPartitionSplit> newPartitionSplits) {
+        int numReaders = context.currentParallelism();
+        for (PulsarPartitionSplit split : newPartitionSplits) {
+            int ownerReader = getSplitOwner(split.getPartition(), numReaders);
+            pendingPartitionSplits
+                .computeIfAbsent(ownerReader, r -> new HashSet<>())
+                .add(split);
+        }
+        LOG.debug(
+            "Assigned {} to {} readers of subscription {}.",
+            newPartitionSplits,
+            numReaders,
+            subscriptionName);
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    static int getSplitOwner(TopicPartition tp, int numReaders) {
+        int startIndex = ((tp.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numReaders;
+
+        // here, the assumption is that the id of pulsar partitions are always ascending
+        // starting from 0, and therefore can be used directly as the offset clockwise from the
+        // start index
+        return (startIndex + tp.getPartition()) % numReaders;
+    }
+
+    private void assignPendingPartitionSplits(Set<Integer> pendingReaders) {
+        // Check if there's any pending splits for given readers
+        for (int pendingReader : pendingReaders) {
+
+            // Remove pending assignment for the reader
+            final Set<PulsarPartitionSplit> pendingAssignmentForReader =
+                pendingPartitionSplits.remove(pendingReader);
+
+            if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
+
+                // Mark pending partitions as already assigned
+                pendingAssignmentForReader.forEach(
+                    split -> assignedPartitions.add(split.getPartition()));
+
+                // Assign pending splits to reader
+                LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
+                context.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
+            }
+        }
+
+        // If periodically partition discovery is disabled and the initializing discovery has done,
+        // signal NoMoreSplitsEvent to pending readers
+        if (noMoreNewPartitionSplits && stopCursor.getBoundedness() == Boundedness.BOUNDED) {
+            LOG.debug(
+                "No more PulsarPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}"
+                    + " in subscription {}.",
+                pendingReaders,
+                subscriptionName);
+            pendingReaders.forEach(context::signalNoMoreSplits);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (pulsarAdmin != null) {
+            pulsarAdmin.close();
+        }
+    }
+
+    @Override
+    public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) {
+        addPartitionSplitChangeToPendingAssignments(splits);
+
+        // If the failed subtask has already restarted, we need to assign pending splits to it
+        if (context.registeredReaders().contains(subtaskId)) {
+            assignPendingPartitionSplits(Collections.singleton(subtaskId));
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingPartitionSplits.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        // Do nothing because Pulsar source push split.
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        LOG.debug(
+            "Adding reader {} to PulsarSourceEnumerator for subscription {}.",
+            subtaskId,
+            subscriptionName);
+        assignPendingPartitionSplits(Collections.singleton(subtaskId));
+    }
+
+    @Override
+    public PulsarSplitEnumeratorState snapshotState(long checkpointId) throws Exception {
+        return new PulsarSplitEnumeratorState(assignedPartitions);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // nothing
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
new file mode 100644
index 000000000..8df46e7b4
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator;
+
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class PulsarSplitEnumeratorState implements Serializable {
+    private final Set<TopicPartition> assignedPartitions;
+
+    PulsarSplitEnumeratorState(Set<TopicPartition> assignedPartitions) {
+        this.assignedPartitions = assignedPartitions;
+    }
+
+    public Set<TopicPartition> assignedPartitions() {
+        return assignedPartitions;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
new file mode 100644
index 000000000..4e703cec9
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/MessageIdStartCursor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+/**
+ * This cursor would left pulsar start consuming from a specific message id.
+ */
+public class MessageIdStartCursor implements StartCursor {
+    private static final long serialVersionUID = 1L;
+
+    private final MessageId messageId;
+
+    /**
+     * The default {@code inclusive} behavior should be controlled in {@link
+     * ConsumerBuilder#startMessageIdInclusive}. But pulsar has a bug and don't support this
+     * currently. We have to use {@code entry + 1} policy for consuming the next available message.
+     * If the message id entry is not valid. Pulsar would automatically find next valid message id.
+     * Please referer <a
+     * href="https://github.com/apache/pulsar/blob/36d5738412bb1ed9018178007bf63d9202b675db/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java#L1151">this
+     * code</a> for understanding pulsar internal logic.
+     *
+     * @param messageId The message id for start position.
+     * @param inclusive Should we include the start message id in consuming result.
+     */
+    public MessageIdStartCursor(MessageId messageId, boolean inclusive) {
+        if (inclusive) {
+            this.messageId = messageId;
+        } else {
+            checkArgument(messageId instanceof MessageIdImpl, "We only support normal message id and batch message id.");
+            MessageIdImpl id = (MessageIdImpl) messageId;
+            this.messageId =
+                new MessageIdImpl(
+                    id.getLedgerId(), id.getEntryId() + 1, id.getPartitionIndex());
+        }
+    }
+
+    @Override
+    public void seekPosition(Consumer<?> consumer) throws PulsarClientException {
+        consumer.seek(messageId);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
new file mode 100644
index 000000000..fca1a9dab
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/StartCursor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.io.Serializable;
+
+/**
+ * A interface for users to specify the start position of a pulsar subscription. Since it would be
+ * serialized into split. The implementation for this interface should be well considered. I don't
+ * recommend adding extra internal state for this implementation.
+ *
+ * <p>This class would be used only for {@link SubscriptionType#Exclusive} and {@link
+ * SubscriptionType#Failover}.
+ */
+@FunctionalInterface
+public interface StartCursor extends Serializable {
+
+    /**
+     * Helper method for seek the right position for given pulsar consumer.
+     */
+    void seekPosition(Consumer<?> consumer) throws PulsarClientException;
+
+    // --------------------------- Static Factory Methods -----------------------------
+
+    static StartCursor earliest() {
+        return fromMessageId(MessageId.earliest);
+    }
+
+    static StartCursor latest() {
+        return fromMessageId(MessageId.latest);
+    }
+
+    static StartCursor subscription() {
+        return new SubscriptionStartCursor();
+    }
+
+    static StartCursor subscription(SubscriptionStartCursor.CursorResetStrategy cursorResetStrategy) {
+        return new SubscriptionStartCursor(cursorResetStrategy);
+    }
+
+    static StartCursor fromMessageId(MessageId messageId) {
+        return fromMessageId(messageId, true);
+    }
+
+    /**
+     * @param messageId Find the available message id and start consuming from it.
+     * @param inclusive {@code true} would include the given message id.
+     */
+    static StartCursor fromMessageId(MessageId messageId, boolean inclusive) {
+        return new MessageIdStartCursor(messageId, inclusive);
+    }
+
+    static StartCursor timestamp(long timestamp) {
+        return new TimestampStartCursor(timestamp);
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
new file mode 100644
index 000000000..e62d24079
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/SubscriptionStartCursor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
+
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+public class SubscriptionStartCursor implements StartCursor {
+    private static final long serialVersionUID = 1L;
+
+    private final CursorResetStrategy cursorResetStrategy;
+
+    public SubscriptionStartCursor() {
+        this.cursorResetStrategy = CursorResetStrategy.LATEST;
+    }
+
+    public SubscriptionStartCursor(CursorResetStrategy cursorResetStrategy) {
+        this.cursorResetStrategy = cursorResetStrategy;
+    }
+
+    public void ensureSubscription(String subscription, TopicPartition partition, PulsarAdmin pulsarAdmin) {
+        try {
+            if (pulsarAdmin.topics()
+                .getSubscriptions(partition.getFullTopicName())
+                .contains(subscription)) {
+                return;
+            }
+            pulsarAdmin.topics().createSubscription(partition.getFullTopicName(), subscription, CursorResetStrategy.EARLIEST == cursorResetStrategy ? MessageId.earliest : MessageId.latest);
+        } catch (PulsarAdminException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void seekPosition(Consumer<?> consumer) throws PulsarClientException {
+        // nothing
+    }
+
+    public enum CursorResetStrategy {
+        LATEST, EARLIEST
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
new file mode 100644
index 000000000..d89cd40e5
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/start/TimestampStartCursor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * This cursor would left pulsar start consuming from a specific timestamp.
+ */
+public class TimestampStartCursor implements StartCursor {
+    private static final long serialVersionUID = 5170578885838095320L;
+
+    private final long timestamp;
+
+    public TimestampStartCursor(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public void seekPosition(Consumer<?> consumer) throws PulsarClientException {
+        consumer.seek(timestamp);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
new file mode 100644
index 000000000..14aeeff28
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/LatestMessageStopCursor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
+
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+/**
+ * A stop cursor that initialize the position to the latest message id. The offsets initialization
+ * are taken care of by the {@code PulsarPartitionSplitReaderBase} instead of by the {@code
+ * PulsarSourceEnumerator}.
+ */
+public class LatestMessageStopCursor implements StopCursor {
+    private static final long serialVersionUID = 1L;
+
+    private MessageId messageId;
+
+    public void prepare(PulsarAdmin admin, TopicPartition partition) {
+        if (messageId == null) {
+            String topic = partition.getFullTopicName();
+            try {
+                messageId = admin.topics().getLastMessageId(topic);
+            } catch (PulsarAdminException e) {
+                throw new RuntimeException("Failed to get the last cursor", e);
+            }
+        }
+    }
+
+    @Override
+    public boolean shouldStop(Message<?> message) {
+        MessageId id = message.getMessageId();
+        return id.compareTo(messageId) >= 0;
+    }
+
+    @Override
+    public StopCursor copy() {
+        return new LatestMessageStopCursor();
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
new file mode 100644
index 000000000..3e1628027
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/MessageIdStopCursor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+/**
+ * Stop consuming message at a given message id. We use the {@link MessageId#compareTo(Object)} for
+ * compare the consuming message with the given message id.
+ */
+public class MessageIdStopCursor implements StopCursor {
+    private static final long serialVersionUID = 1L;
+
+    private final MessageId messageId;
+
+    private final boolean exclusive;
+
+    public MessageIdStopCursor(MessageId messageId) {
+        this(messageId, true);
+    }
+
+    public MessageIdStopCursor(MessageId messageId, boolean exclusive) {
+        this.messageId = messageId;
+        this.exclusive = exclusive;
+    }
+
+    @Override
+    public boolean shouldStop(Message<?> message) {
+        MessageId id = message.getMessageId();
+        if (exclusive) {
+            return id.compareTo(messageId) > 0;
+        } else {
+            return id.compareTo(messageId) >= 0;
+        }
+    }
+
+    @Override
+    public StopCursor copy() {
+        return new MessageIdStopCursor(messageId, exclusive);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
new file mode 100644
index 000000000..4b0da5a1a
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/NeverStopCursor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
+
+import org.apache.seatunnel.api.source.Boundedness;
+
+import org.apache.pulsar.client.api.Message;
+
+/**
+ * A implementation which wouldn't stop forever.
+ */
+public class NeverStopCursor implements StopCursor {
+    private static final long serialVersionUID = 1L;
+    public static final NeverStopCursor INSTANCE = new NeverStopCursor();
+
+    private NeverStopCursor() {
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.UNBOUNDED;
+    }
+
+    @Override
+    public boolean shouldStop(Message<?> message) {
+        return false;
+    }
+
+    @Override
+    public StopCursor copy() {
+        return INSTANCE;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/StopCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/StopCursor.java
new file mode 100644
index 000000000..8ef843845
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/StopCursor.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
+
+import org.apache.seatunnel.api.source.Boundedness;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+import java.io.Serializable;
+
+public interface StopCursor extends Serializable {
+
+    /**
+     * Determine whether to pause consumption on the current message by the returned boolean value.
+     * The message presented in method argument wouldn't be consumed if the return result is true.
+     */
+    boolean shouldStop(Message<?> message);
+
+    default Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    StopCursor copy();
+
+    // --------------------------- Static Factory Methods -----------------------------
+
+    static StopCursor never() {
+        return NeverStopCursor.INSTANCE;
+    }
+
+    static StopCursor latest() {
+        return new LatestMessageStopCursor();
+    }
+
+    static StopCursor atMessageId(MessageId messageId) {
+        return new MessageIdStopCursor(messageId);
+    }
+
+    static StopCursor afterMessageId(MessageId messageId) {
+        return new MessageIdStopCursor(messageId, false);
+    }
+
+    static StopCursor timestamp(long timestamp) {
+        return new TimestampStopCursor(timestamp);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
new file mode 100644
index 000000000..c2a48a702
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/cursor/stop/TimestampStopCursor.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop;
+
+import org.apache.pulsar.client.api.Message;
+
+/**
+ * Stop consuming message at the given event time.
+ */
+public class TimestampStopCursor implements StopCursor {
+    private static final long serialVersionUID = 1L;
+
+    private final long timestamp;
+
+    public TimestampStopCursor(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public boolean shouldStop(Message<?> message) {
+        return message.getEventTime() >= timestamp;
+    }
+
+    @Override
+    public StopCursor copy() {
+        return new TimestampStopCursor(timestamp);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/PulsarDiscoverer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/PulsarDiscoverer.java
new file mode 100644
index 000000000..a45930a71
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/PulsarDiscoverer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
+
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public interface PulsarDiscoverer extends Serializable {
+    Set<TopicPartition> getSubscribedTopicPartitions(
+        PulsarAdmin pulsarAdmin);
+
+    static List<TopicPartition> toTopicPartitions(String topicName, int partitionSize) {
+        if (partitionSize == PartitionedTopicMetadata.NON_PARTITIONED) {
+            // For non-partitioned topic.
+            return Collections.singletonList(new TopicPartition(topicName, -1));
+        } else {
+            return IntStream.range(0, partitionSize)
+                .boxed()
+                .map(partitionId -> new TopicPartition(topicName, partitionId))
+                .collect(Collectors.toList());
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
new file mode 100644
index 000000000..b362446d0
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicListDiscoverer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
+
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * the implements of consuming multiple topics.
+ */
+public class TopicListDiscoverer implements PulsarDiscoverer {
+
+    private final List<String> topics;
+
+    public TopicListDiscoverer(List<String> topics) {
+        this.topics = topics;
+    }
+
+    @Override
+    public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin pulsarAdmin) {
+        return topics.parallelStream()
+            .map(topicName -> {
+                String completeTopicName = TopicName.get(topicName).getPartitionedTopicName();
+                try {
+                    PartitionedTopicMetadata metadata =
+                        pulsarAdmin.topics().getPartitionedTopicMetadata(completeTopicName);
+                    return PulsarDiscoverer.toTopicPartitions(topicName, metadata.partitions);
+                } catch (PulsarAdminException e) {
+                    // This method would cause the failure for subscriber.
+                    throw new IllegalStateException(e);
+                }
+            })
+            .filter(Objects::nonNull)
+            .flatMap(Collection::stream)
+            .collect(Collectors.toSet());
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
new file mode 100644
index 000000000..d5eaedac5
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/discoverer/TopicPatternDiscoverer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer;
+
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class TopicPatternDiscoverer implements PulsarDiscoverer {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(TopicPatternDiscoverer.class);
+
+    private final Pattern topicPattern;
+    private final RegexSubscriptionMode subscriptionMode;
+    private final String namespace;
+
+    public TopicPatternDiscoverer(Pattern topicPattern) {
+        this.topicPattern = topicPattern;
+
+        this.subscriptionMode = RegexSubscriptionMode.AllTopics;
+        // Extract the namespace from topic pattern regex.
+        // If no namespace provided in the regex, we would directly use "default" as the namespace.
+        TopicName destination = TopicName.get(topicPattern.toString());
+        NamespaceName namespaceName = destination.getNamespaceObject();
+        this.namespace = namespaceName.toString();
+    }
+
+    @Override
+    public Set<TopicPartition> getSubscribedTopicPartitions(PulsarAdmin pulsarAdmin) {
+        LOG.debug("Fetching descriptions for all topics on pulsar cluster");
+        try {
+            return pulsarAdmin
+                .namespaces()
+                .getTopics(namespace)
+                .parallelStream()
+                .filter(this::matchesSubscriptionMode)
+                .filter(topic -> topicPattern.matcher(topic).find())
+                .map(topicName -> {
+                    String completeTopicName = TopicName.get(topicName).getPartitionedTopicName();
+                    try {
+                        PartitionedTopicMetadata metadata =
+                            pulsarAdmin.topics().getPartitionedTopicMetadata(completeTopicName);
+                        return PulsarDiscoverer.toTopicPartitions(topicName, metadata.partitions);
+                    } catch (PulsarAdminException e) {
+                        // This method would cause the failure for subscriber.
+                        throw new IllegalStateException(e);
+                    }
+                }).filter(Objects::nonNull)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toSet());
+        } catch (PulsarAdminException e) {
+            // This method would cause the failure for subscriber.
+            throw new IllegalStateException(e);
+        }
+    }
+
+    private boolean matchesSubscriptionMode(String topic) {
+        TopicName topicName = TopicName.get(topic);
+        // Filter the topic persistence.
+        switch (subscriptionMode) {
+            case PersistentOnly:
+                return topicName.isPersistent();
+            case NonPersistentOnly:
+                return !topicName.isPersistent();
+            default:
+                // RegexSubscriptionMode.AllTopics
+                return true;
+        }
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/topic/TopicPartition.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/topic/TopicPartition.java
new file mode 100644
index 000000000..de9110125
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/topic/TopicPartition.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic;
+
+import org.apache.pulsar.common.naming.TopicName;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Basic information about a topic. If the topic is not partitioned, the partition number will be -1.
+ */
+public class TopicPartition implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int hash = 0;
+    /**
+     * The topic name of the pulsar. It would be a full topic name, if your don't provide the tenant
+     * and namespace, we would add them automatically.
+     */
+    private final String topic;
+
+    /**
+     * Index of partition for the topic. It would be natural number for partitioned topic with a
+     * non-key_shared subscription.
+     */
+    private final int partition;
+
+    public TopicPartition(String topic, int partition) {
+        this.topic = topic;
+        this.partition = partition;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public String getFullTopicName() {
+        if (partition < 0) {
+            return topic;
+        }
+        return TopicName.get(topic).getPartition(partition).toString();
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0) {
+            return hash;
+        }
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + partition;
+        result = prime * result + Objects.hashCode(topic);
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        TopicPartition other = (TopicPartition) obj;
+        return partition == other.partition && Objects.equals(topic, other.topic);
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/Handover.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/Handover.java
new file mode 100644
index 000000000..04b8f0a24
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/Handover.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public final class Handover<T> implements Closeable {
+    private final Object lock = new Object();
+    private final LinkedBlockingQueue<T> blockingQueue =
+        new LinkedBlockingQueue<>();
+    private Throwable error;
+
+    public boolean isEmpty() {
+        return blockingQueue.isEmpty();
+    }
+
+    public Optional<T> pollNext() throws Exception {
+        if (error != null) {
+            rethrowException(error, error.getMessage());
+        } else if (!isEmpty()) {
+            return Optional.ofNullable(blockingQueue.poll());
+        }
+        return Optional.empty();
+    }
+
+    public void produce(final T element)
+        throws InterruptedException, ClosedException {
+        if (error != null) {
+            throw new ClosedException();
+        }
+        blockingQueue.put(element);
+    }
+
+    public void reportError(Throwable t) {
+        checkNotNull(t);
+
+        synchronized (lock) {
+            // do not override the initial exception
+            if (error == null) {
+                error = t;
+            }
+            lock.notifyAll();
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (lock) {
+            if (error == null) {
+                error = new ClosedException();
+            }
+            lock.notifyAll();
+        }
+    }
+
+    public static void rethrowException(Throwable t, String parentMessage) throws Exception {
+        if (t instanceof Error) {
+            throw (Error) t;
+        } else if (t instanceof Exception) {
+            throw (Exception) t;
+        } else {
+            throw new Exception(parentMessage, t);
+        }
+    }
+
+    public static final class ClosedException extends Exception {
+        private static final long serialVersionUID = 1L;
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
new file mode 100644
index 000000000..725f6551b
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSplit> {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class);
+    protected final SourceReader.Context context;
+    protected final PulsarClientConfig clientConfig;
+    protected final PulsarConsumerConfig consumerConfig;
+    protected final StartCursor startCursor;
+    protected final Handover<RecordWithSplitId> handover;
+
+    protected final Map<String, PulsarPartitionSplit> splitStates;
+    protected final Map<String, PulsarSplitReaderThread> splitReaders;
+    protected final SortedMap<Long, Map<String, MessageId>> pendingCursorsToCommit;
+    protected final Map<String, MessageId> pendingCursorsToFinish;
+    protected final Set<String> finishedSplits;
+
+    protected final DeserializationSchema<T> deserialization;
+
+    /**
+     * The maximum number of milliseconds to wait for a fetch batch.
+     */
+    protected final int pollTimeout;
+    protected final long pollInterval;
+    protected final int batchSize;
+
+    protected PulsarClient pulsarClient;
+    /**
+     * Indicating whether the SourceReader will be assigned more splits or not.
+     */
+    private boolean noMoreSplitsAssignment = false;
+
+    public PulsarSourceReader(SourceReader.Context context,
+                              PulsarClientConfig clientConfig,
+                              PulsarConsumerConfig consumerConfig,
+                              StartCursor startCursor,
+                              DeserializationSchema<T> deserialization,
+                              int pollTimeout,
+                              long pollInterval,
+                              int batchSize) {
+        this.context = context;
+        this.clientConfig = clientConfig;
+        this.consumerConfig = consumerConfig;
+        this.startCursor = startCursor;
+        this.deserialization = deserialization;
+        this.pollTimeout = pollTimeout;
+        this.pollInterval = pollInterval;
+        this.batchSize = batchSize;
+        this.splitStates = new HashMap<>();
+        this.splitReaders = new HashMap<>();
+        this.pendingCursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>());
+        this.pendingCursorsToFinish = Collections.synchronizedSortedMap(new TreeMap<>());
+        this.finishedSplits = new TreeSet<>();
+        this.handover = new Handover<>();
+    }
+
+    @Override
+    public void open() {
+        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (pulsarClient != null) {
+            pulsarClient.close();
+        }
+        splitReaders.values().forEach(reader -> {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to close the split reader thread.", e);
+            }
+        });
+    }
+
+    @Override
+    public void pollNext(Collector<T> output) throws Exception {
+        for (int i = 0; i < batchSize; i++) {
+            Optional<RecordWithSplitId> recordWithSplitId = handover.pollNext();
+            if (recordWithSplitId.isPresent()) {
+                final String splitId = recordWithSplitId.get().getSplitId();
+                final Message<byte[]> message = recordWithSplitId.get().getMessage();
+                splitStates.get(splitId).setLatestConsumedId(message.getMessageId());
+                deserialization.deserialize(message.getData(), output);
+            }
+            if (noMoreSplitsAssignment && finishedSplits.size() == splitStates.size()) {
+                context.signalNoMoreElement();
+                break;
+            }
+        }
+    }
+
+    @Override
+    public List<PulsarPartitionSplit> snapshotState(long checkpointId) throws Exception {
+        List<PulsarPartitionSplit> pendingSplit = splitStates.values().stream()
+            .map(PulsarPartitionSplit::copy)
+            .collect(Collectors.toList());
+        // Perform a snapshot for these splits.
+        int size = pendingSplit.size();
+        Map<String, MessageId> cursors =
+            pendingCursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>(size));
+        // Put the cursors of the active splits.
+        for (PulsarPartitionSplit split : pendingSplit) {
+            MessageId latestConsumedId = split.getLatestConsumedId();
+            if (latestConsumedId != null) {
+                cursors.put(split.splitId(), latestConsumedId);
+            }
+        }
+        return pendingSplit;
+    }
+
+    @Override
+    public void addSplits(List<PulsarPartitionSplit> splits) {
+        splits.forEach(split -> {
+            splitStates.put(split.splitId(), split);
+            PulsarSplitReaderThread splitReaderThread = createPulsarSplitReaderThread(split);
+            try {
+                splitReaderThread.open();
+                splitReaders.put(split.splitId(), splitReaderThread);
+                splitReaderThread.start();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException("Failed to start the split reader thread.", e);
+            }
+        });
+    }
+
+    protected PulsarSplitReaderThread createPulsarSplitReaderThread(PulsarPartitionSplit split) {
+        return new PulsarSplitReaderThread(this,
+            split,
+            pulsarClient,
+            consumerConfig,
+            pollTimeout,
+            pollInterval,
+            startCursor,
+            handover);
+    }
+
+    public void handleNoMoreElements(String splitId, MessageId messageId) {
+        LOG.info("Reader received the split {} NoMoreElements event.", splitId);
+        pendingCursorsToFinish.put(splitId, messageId);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        LOG.info("Reader received NoMoreSplits event.");
+        this.noMoreSplitsAssignment = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        LOG.debug("Committing cursors for checkpoint {}", checkpointId);
+        Map<String, MessageId> pendingCursors = pendingCursorsToCommit.remove(checkpointId);
+        if (pendingCursors == null) {
+            LOG.debug(
+                "Cursors for checkpoint {} either do not exist or have already been committed.",
+                checkpointId);
+            return;
+        }
+        pendingCursors.forEach((splitId, messageId) -> {
+            if (finishedSplits.contains(splitId)) {
+                return;
+            }
+
+            splitReaders.get(splitId).committingCursor(messageId);
+
+            if (pendingCursorsToFinish.containsKey(splitId) &&
+                pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
+                finishedSplits.add(splitId);
+                try {
+                    splitReaders.get(splitId).close();
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to close the split reader thread.", e);
+                }
+            }
+        });
+    }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
new file mode 100644
index 000000000..f708a1759
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
+
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class PulsarSplitReaderThread extends Thread implements Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarSplitReaderThread.class);
+    protected final PulsarSourceReader sourceReader;
+    protected final PulsarPartitionSplit split;
+    protected final PulsarClient pulsarClient;
+    protected final PulsarConsumerConfig consumerConfig;
+    /**
+     * The maximum number of milliseconds to wait for a fetch batch.
+     */
+    protected final int pollTimeout;
+    protected final long pollInterval;
+    protected final StartCursor startCursor;
+    protected final Handover<RecordWithSplitId> handover;
+    protected Consumer<byte[]> consumer;
+
+    /**
+     * Flag to mark the main work loop as alive.
+     */
+    private volatile boolean running;
+
+    public PulsarSplitReaderThread(PulsarSourceReader sourceReader,
+                                   PulsarPartitionSplit split,
+                                   PulsarClient pulsarClient,
+                                   PulsarConsumerConfig consumerConfig,
+                                   int pollTimeout,
+                                   long pollInterval,
+                                   StartCursor startCursor,
+                                   Handover<RecordWithSplitId> handover) {
+        this.sourceReader = sourceReader;
+        this.split = split;
+        this.pulsarClient = pulsarClient;
+        this.consumerConfig = consumerConfig;
+        this.pollTimeout = pollTimeout;
+        this.pollInterval = pollInterval;
+        this.startCursor = startCursor;
+        this.handover = handover;
+    }
+
+    public void open() throws PulsarClientException {
+        this.consumer = createPulsarConsumer(split);
+        if (split.getLatestConsumedId() == null) {
+            startCursor.seekPosition(consumer);
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            final StopCursor stopCursor = split.getStopCursor();
+            while (running) {
+                Message<byte[]> message = consumer.receive(pollTimeout, TimeUnit.MILLISECONDS);
+                if (message != null) {
+                    handover.produce(new RecordWithSplitId(message, split.splitId()));
+                    if (stopCursor.shouldStop(message)) {
+                        sourceReader.handleNoMoreElements(split.splitId(), message.getMessageId());
+                        break;
+                    }
+                }
+                Thread.sleep(pollInterval);
+            }
+        } catch (Throwable t) {
+            handover.reportError(t);
+        } finally {
+            // make sure the PulsarConsumer is closed
+            try {
+                consumer.close();
+            } catch (Throwable t) {
+                LOG.warn("Error while closing pulsar consumer", t);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        running = false;
+        if (consumer != null) {
+            consumer.close();
+        }
+    }
+
+    public void committingCursor(MessageId offsetsToCommit) {
+        if (consumer == null) {
+            consumer = createPulsarConsumer(split);
+        }
+        consumer.acknowledgeAsync(offsetsToCommit);
+    }
+
+    /**
+     * Create a specified {@link Consumer} by the given split information.
+     */
+    protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit split) {
+        ConsumerBuilder<byte[]> consumerBuilder =
+            PulsarConfigUtil.createConsumerBuilder(pulsarClient, consumerConfig);
+
+        consumerBuilder.topic(split.getPartition().getFullTopicName());
+
+        // Create the consumer configuration by using common utils.
+        try {
+            return consumerBuilder.subscribe();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException("Failed to create pulsar consumer:", e);
+        }
+    }
+
+}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/RecordWithSplitId.java
similarity index 55%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
copy to seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/RecordWithSplitId.java
index e9052bfe6..62e4c0777 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/PropertiesUtil.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/RecordWithSplitId.java
@@ -15,28 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.common;
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.pulsar.client.api.Message;
 
-import java.util.Properties;
+public final class RecordWithSplitId {
+    private final Message<byte[]> message;
+    private final String splitId;
 
-public final class PropertiesUtil {
+    public RecordWithSplitId(Message<byte[]> message, String splitId) {
+        this.message = message;
+        this.splitId = splitId;
+    }
 
-    private PropertiesUtil() {
+    public Message<byte[]> getMessage() {
+        return message;
     }
 
-    public static void setProperties(Config config, Properties properties, String prefix, boolean keepPrefix) {
-        config.entrySet().forEach(entry -> {
-            String key = entry.getKey();
-            Object value = entry.getValue().unwrapped();
-            if (key.startsWith(prefix)) {
-                if (keepPrefix) {
-                    properties.put(key, value);
-                } else {
-                    properties.put(key.substring(prefix.length()), value);
-                }
-            }
-        });
+    public String getSplitId() {
+        return splitId;
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/split/PulsarPartitionSplit.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/split/PulsarPartitionSplit.java
new file mode 100644
index 000000000..fed3f19b6
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/split/PulsarPartitionSplit.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.topic.TopicPartition;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.shade.com.google.common.base.Preconditions;
+import org.apache.pulsar.shade.javax.annotation.Nullable;
+
+import java.util.Objects;
+
+public class PulsarPartitionSplit implements SourceSplit {
+
+    private final TopicPartition partition;
+
+    private final StopCursor stopCursor;
+
+    @Nullable
+    private MessageId latestConsumedId;
+
+    public PulsarPartitionSplit(TopicPartition partition, StopCursor stopCursor) {
+        this(partition, stopCursor, null);
+    }
+
+    public PulsarPartitionSplit(
+        TopicPartition partition,
+        StopCursor stopCursor,
+        MessageId latestConsumedId) {
+        this.partition = Preconditions.checkNotNull(partition);
+        this.stopCursor = Preconditions.checkNotNull(stopCursor);
+        this.latestConsumedId = latestConsumedId;
+    }
+
+    public TopicPartition getPartition() {
+        return partition;
+    }
+
+    public StopCursor getStopCursor() {
+        return stopCursor;
+    }
+
+    @Nullable
+    public MessageId getLatestConsumedId() {
+        return latestConsumedId;
+    }
+
+    @Override
+    public String splitId() {
+        return partition.getFullTopicName();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PulsarPartitionSplit that = (PulsarPartitionSplit) o;
+        return partition.equals(that.partition);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(partition);
+    }
+
+    public void setLatestConsumedId(MessageId latestConsumedId) {
+        this.latestConsumedId = latestConsumedId;
+    }
+
+    public PulsarPartitionSplit copy() {
+        return new PulsarPartitionSplit(partition, stopCursor, latestConsumedId);
+    }
+}
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 57a4e7544..091b8ca9a 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -443,6 +443,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache POI (org.apache.poi:poi-ooxml-schemas:4.1.2 - http://poi.apache.org/)
      (Apache License, Version 2.0) Apache POI (org.apache.poi:poi-ooxml:4.1.2 - http://poi.apache.org/)
      (Apache License, Version 2.0) Apache POI (org.apache.poi:poi:4.1.2 - http://poi.apache.org/)
+     (Apache License, Version 2.0) Apache Pulsar :: Bouncy Castle :: BC (org.apache.pulsar:bouncy-castle-bc:2.8.0 - https://github.com/apache/pulsar)
      (Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.11.0 - https://yetus.apache.org/audience-annotations)
      (Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.5.0 - https://yetus.apache.org/audience-annotations)
      (Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.7.0 - https://yetus.apache.org/audience-annotations)
@@ -462,6 +463,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Flink : Tools : Force Shading (org.apache.flink:force-shading:1.13.6 - https://www.apache.org/force-shading/)
      (Apache License, Version 2.0) Hibernate Validator Engine (org.hibernate:hibernate-validator:5.2.5.Final - http://hibernate.org/validator/hibernate-validator)
      (Apache License, Version 2.0) Hive Storage API (org.apache.hive:hive-storage-api:2.6.0 - https://www.apache.org/hive-storage-api/)
+     (Apache License, Version 2.0) JCIP Annotations under Apache License (com.github.stephenc.jcip:jcip-annotations:1.0 - http://stephenc.github.com/jcip-annotations)
      (Apache License, Version 2.0) JCIP Annotations under Apache License (com.github.stephenc.jcip:jcip-annotations:1.0-1 - http://stephenc.github.com/jcip-annotations)
      (Apache License, Version 2.0) JCL 1.2 implemented over SLF4J (org.slf4j:jcl-over-slf4j:1.7.30 - http://www.slf4j.org)
      (Apache License, Version 2.0) JMES Path Query library (com.amazonaws:jmespath-java:1.12.37 - https://aws.amazon.com/sdkforjava)
@@ -523,6 +525,9 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) ORC Shims (org.apache.orc:orc-shims:1.5.2 - http://orc.apache.org/orc-shims)
      (Apache License, Version 2.0) ORC Shims (org.apache.orc:orc-shims:1.5.6 - http://orc.apache.org/orc-shims)
      (Apache License, Version 2.0) Plexus Common Utilities (org.codehaus.plexus:plexus-utils:3.1.0 - http://codehaus-plexus.github.io/plexus-utils/)
+     (Apache License, Version 2.0) Pulsar Client :: API (org.apache.pulsar:pulsar-client-api:2.8.0 - https://github.com/apache/pulsar)
+     (Apache License, Version 2.0) Pulsar Client Admin :: API (org.apache.pulsar:pulsar-client-admin-api:2.8.0 - https://github.com/apache/pulsar)
+     (Apache License, Version 2.0) Pulsar Client All (org.apache.pulsar:pulsar-client-all:2.8.0 - https://github.com/apache/pulsar)
      (Apache License, Version 2.0) Sigar (org.hyperic:sigar:1.6.5.132 - https://github.com/hyperic/sigar)
      (Apache License, Version 2.0) SnakeYAML (org.yaml:snakeyaml:1.17 - http://www.snakeyaml.org)
      (Apache License, Version 2.0) SnakeYAML (org.yaml:snakeyaml:1.24 - http://www.snakeyaml.org)
@@ -907,6 +912,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (MIT License) JCodings (org.jruby.jcodings:jcodings:1.0.18 - http://nexus.sonatype.org/oss-repository-hosting.html/jcodings)
      (MIT License) JCodings (org.jruby.jcodings:jcodings:1.0.43 - http://nexus.sonatype.org/oss-repository-hosting.html/jcodings)
      (MIT License) JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.16 - http://www.slf4j.org)
+     (MIT License) JUL to SLF4J bridge (org.slf4j:jul-to-slf4j:1.7.25 - http://www.slf4j.org)
      (MIT License) Joni (org.jruby.joni:joni:2.1.11 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
      (MIT License) Joni (org.jruby.joni:joni:2.1.2 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
      (MIT License) Joni (org.jruby.joni:joni:2.1.27 - http://nexus.sonatype.org/oss-repository-hosting.html/joni)
@@ -920,6 +926,9 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.10.0 - https://checkerframework.org)
      (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.4.0 - https://checkerframework.org)
      (The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.2 - http://pholser.github.io/jopt-simple)
+     (Bouncy Castle License) The Bouncy Castle Crypto Package For Java (org.bouncycastle:bcpkix-jdk15on:1.68 - https://github.com/bcgit/bc-java)
+     (Bouncy Castle License) The Bouncy Castle Crypto Package For Java (org.bouncycastle:bcprov-ext-jdk15on:1.68 - https://github.com/bcgit/bc-java)
+     (Bouncy Castle License) The Bouncy Castle Crypto Package For Java (org.bouncycastle:bcprov-jdk15on:1.68 - https://github.com/bcgit/bc-java)
 
 
 ========================================================================
@@ -1057,7 +1066,10 @@ The following components are provided under the Eclipse Public License. See proj
 The text of each license is also included at licenses/LICENSE-[project].txt.
 
      (EDL 1.0) JavaBeans Activation Framework API jar (jakarta.activation:jakarta.activation-api:1.2.1 - https://github.com/eclipse-ee4j/jaf/jakarta.activation-api)
+     (EDL 1.0) JavaBeans Activation Framework API jar (jakarta.activation:jakarta.activation-api:1.2.2 - https://github.com/eclipse-ee4j/jaf/jakarta.activation-api)
+     (EPL 2.0) Jakarta RESTful Web Services API (jakarta.ws.rs:jakarta.ws.rs-api:2.1.6 - https://github.com/eclipse-ee4j/jaxrs-api)
      (Eclipse Distribution License - v 1.0) jakarta.xml.bind-api (jakarta.xml.bind:jakarta.xml.bind-api:2.3.2 - https://github.com/eclipse-ee4j/jaxb-api/jakarta.xml.bind-api)
+     (Eclipse Distribution License - v 1.0) jakarta.xml.bind-api (jakarta.xml.bind:jakarta.xml.bind-api:2.3.3 - https://github.com/eclipse-ee4j/jaxb-api/jakarta.xml.bind-api)
      (Eclipse Public License 1.0) JUnit (junit:junit:4.12 - http://junit.org)
      (Eclipse Public License, Version 1.0) Aether API (org.eclipse.aether:aether-api:0.9.0.M2 - http://www.eclipse.org/aether/aether-api/)
      (Eclipse Public License, Version 1.0) Aether Connector File (org.eclipse.aether:aether-connector-file:0.9.0.M2 - http://www.eclipse.org/aether/aether-connector-file/)
diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE
index 632dfc1e6..4f0186eb7 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -4367,6 +4367,14 @@ The Apache Software Foundation (http://www.apache.org/).
 
 =========================================================================
 
+Apache Pulsar NOTICE
 
+=========================================================================
+
+Apache Pulsar
+Copyright 2017-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
 
 =========================================================================
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-bouncycastle.txt b/seatunnel-dist/release-docs/licenses/LICENSE-bouncycastle.txt
new file mode 100644
index 000000000..39847391f
--- /dev/null
+++ b/seatunnel-dist/release-docs/licenses/LICENSE-bouncycastle.txt
@@ -0,0 +1,7 @@
+Please note this should be read in the same way as the MIT license.
+Please also note this licensing model is made possible through funding from donations and the sale of support contracts.
+
+The Bouncy Castle License Copyright (c) 2000-2021 The Legion Of The Bouncy Castle Inc. (https://www.bouncycastle.org)
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/licenses/LICENSE-jaxrs-api.txt b/seatunnel-dist/release-docs/licenses/LICENSE-jaxrs-api.txt
new file mode 100644
index 000000000..5de3d1b40
--- /dev/null
+++ b/seatunnel-dist/release-docs/licenses/LICENSE-jaxrs-api.txt
@@ -0,0 +1,637 @@
+# Eclipse Public License - v 2.0
+
+        THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
+        PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION
+        OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+    1. DEFINITIONS
+
+    "Contribution" means:
+
+      a) in the case of the initial Contributor, the initial content
+         Distributed under this Agreement, and
+
+      b) in the case of each subsequent Contributor: 
+         i) changes to the Program, and 
+         ii) additions to the Program;
+      where such changes and/or additions to the Program originate from
+      and are Distributed by that particular Contributor. A Contribution
+      "originates" from a Contributor if it was added to the Program by
+      such Contributor itself or anyone acting on such Contributor's behalf.
+      Contributions do not include changes or additions to the Program that
+      are not Modified Works.
+
+    "Contributor" means any person or entity that Distributes the Program.
+
+    "Licensed Patents" mean patent claims licensable by a Contributor which
+    are necessarily infringed by the use or sale of its Contribution alone
+    or when combined with the Program.
+
+    "Program" means the Contributions Distributed in accordance with this
+    Agreement.
+
+    "Recipient" means anyone who receives the Program under this Agreement
+    or any Secondary License (as applicable), including Contributors.
+
+    "Derivative Works" shall mean any work, whether in Source Code or other
+    form, that is based on (or derived from) the Program and for which the
+    editorial revisions, annotations, elaborations, or other modifications
+    represent, as a whole, an original work of authorship.
+
+    "Modified Works" shall mean any work in Source Code or other form that
+    results from an addition to, deletion from, or modification of the
+    contents of the Program, including, for purposes of clarity any new file
+    in Source Code form that contains any contents of the Program. Modified
+    Works shall not include works that contain only declarations,
+    interfaces, types, classes, structures, or files of the Program solely
+    in each case in order to link to, bind by name, or subclass the Program
+    or Modified Works thereof.
+
+    "Distribute" means the acts of a) distributing or b) making available
+    in any manner that enables the transfer of a copy.
+
+    "Source Code" means the form of a Program preferred for making
+    modifications, including but not limited to software source code,
+    documentation source, and configuration files.
+
+    "Secondary License" means either the GNU General Public License,
+    Version 2.0, or any later versions of that license, including any
+    exceptions or additional permissions as identified by the initial
+    Contributor.
+
+    2. GRANT OF RIGHTS
+
+      a) Subject to the terms of this Agreement, each Contributor hereby
+      grants Recipient a non-exclusive, worldwide, royalty-free copyright
+      license to reproduce, prepare Derivative Works of, publicly display,
+      publicly perform, Distribute and sublicense the Contribution of such
+      Contributor, if any, and such Derivative Works.
+
+      b) Subject to the terms of this Agreement, each Contributor hereby
+      grants Recipient a non-exclusive, worldwide, royalty-free patent
+      license under Licensed Patents to make, use, sell, offer to sell,
+      import and otherwise transfer the Contribution of such Contributor,
+      if any, in Source Code or other form. This patent license shall
+      apply to the combination of the Contribution and the Program if, at
+      the time the Contribution is added by the Contributor, such addition
+      of the Contribution causes such combination to be covered by the
+      Licensed Patents. The patent license shall not apply to any other
+      combinations which include the Contribution. No hardware per se is
+      licensed hereunder.
+
+      c) Recipient understands that although each Contributor grants the
+      licenses to its Contributions set forth herein, no assurances are
+      provided by any Contributor that the Program does not infringe the
+      patent or other intellectual property rights of any other entity.
+      Each Contributor disclaims any liability to Recipient for claims
+      brought by any other entity based on infringement of intellectual
+      property rights or otherwise. As a condition to exercising the
+      rights and licenses granted hereunder, each Recipient hereby
+      assumes sole responsibility to secure any other intellectual
+      property rights needed, if any. For example, if a third party
+      patent license is required to allow Recipient to Distribute the
+      Program, it is Recipient's responsibility to acquire that license
+      before distributing the Program.
+
+      d) Each Contributor represents that to its knowledge it has
+      sufficient copyright rights in its Contribution, if any, to grant
+      the copyright license set forth in this Agreement.
+
+      e) Notwithstanding the terms of any Secondary License, no
+      Contributor makes additional grants to any Recipient (other than
+      those set forth in this Agreement) as a result of such Recipient's
+      receipt of the Program under the terms of a Secondary License
+      (if permitted under the terms of Section 3).
+
+    3. REQUIREMENTS
+
+    3.1 If a Contributor Distributes the Program in any form, then:
+
+      a) the Program must also be made available as Source Code, in
+      accordance with section 3.2, and the Contributor must accompany
+      the Program with a statement that the Source Code for the Program
+      is available under this Agreement, and informs Recipients how to
+      obtain it in a reasonable manner on or through a medium customarily
+      used for software exchange; and
+
+      b) the Contributor may Distribute the Program under a license
+      different than this Agreement, provided that such license:
+         i) effectively disclaims on behalf of all other Contributors all
+         warranties and conditions, express and implied, including
+         warranties or conditions of title and non-infringement, and
+         implied warranties or conditions of merchantability and fitness
+         for a particular purpose;
+
+         ii) effectively excludes on behalf of all other Contributors all
+         liability for damages, including direct, indirect, special,
+         incidental and consequential damages, such as lost profits;
+
+         iii) does not attempt to limit or alter the recipients' rights
+         in the Source Code under section 3.2; and
+
+         iv) requires any subsequent distribution of the Program by any
+         party to be under a license that satisfies the requirements
+         of this section 3.
+
+    3.2 When the Program is Distributed as Source Code:
+
+      a) it must be made available under this Agreement, or if the
+      Program (i) is combined with other material in a separate file or
+      files made available under a Secondary License, and (ii) the initial
+      Contributor attached to the Source Code the notice described in
+      Exhibit A of this Agreement, then the Program may be made available
+      under the terms of such Secondary Licenses, and
+
+      b) a copy of this Agreement must be included with each copy of
+      the Program.
+
+    3.3 Contributors may not remove or alter any copyright, patent,
+    trademark, attribution notices, disclaimers of warranty, or limitations
+    of liability ("notices") contained within the Program from any copy of
+    the Program which they Distribute, provided that Contributors may add
+    their own appropriate notices.
+
+    4. COMMERCIAL DISTRIBUTION
+
+    Commercial distributors of software may accept certain responsibilities
+    with respect to end users, business partners and the like. While this
+    license is intended to facilitate the commercial use of the Program,
+    the Contributor who includes the Program in a commercial product
+    offering should do so in a manner which does not create potential
+    liability for other Contributors. Therefore, if a Contributor includes
+    the Program in a commercial product offering, such Contributor
+    ("Commercial Contributor") hereby agrees to defend and indemnify every
+    other Contributor ("Indemnified Contributor") against any losses,
+    damages and costs (collectively "Losses") arising from claims, lawsuits
+    and other legal actions brought by a third party against the Indemnified
+    Contributor to the extent caused by the acts or omissions of such
+    Commercial Contributor in connection with its distribution of the Program
+    in a commercial product offering. The obligations in this section do not
+    apply to any claims or Losses relating to any actual or alleged
+    intellectual property infringement. In order to qualify, an Indemnified
+    Contributor must: a) promptly notify the Commercial Contributor in
+    writing of such claim, and b) allow the Commercial Contributor to control,
+    and cooperate with the Commercial Contributor in, the defense and any
+    related settlement negotiations. The Indemnified Contributor may
+    participate in any such claim at its own expense.
+
+    For example, a Contributor might include the Program in a commercial
+    product offering, Product X. That Contributor is then a Commercial
+    Contributor. If that Commercial Contributor then makes performance
+    claims, or offers warranties related to Product X, those performance
+    claims and warranties are such Commercial Contributor's responsibility
+    alone. Under this section, the Commercial Contributor would have to
+    defend claims against the other Contributors related to those performance
+    claims and warranties, and if a court requires any other Contributor to
+    pay any damages as a result, the Commercial Contributor must pay
+    those damages.
+
+    5. NO WARRANTY
+
+    EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+    PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS"
+    BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR
+    IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF
+    TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
+    PURPOSE. Each Recipient is solely responsible for determining the
+    appropriateness of using and distributing the Program and assumes all
+    risks associated with its exercise of rights under this Agreement,
+    including but not limited to the risks and costs of program errors,
+    compliance with applicable laws, damage to or loss of data, programs
+    or equipment, and unavailability or interruption of operations.
+
+    6. DISCLAIMER OF LIABILITY
+
+    EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
+    PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS
+    SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+    EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
+    PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+    CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+    ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
+    EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE
+    POSSIBILITY OF SUCH DAMAGES.
+
+    7. GENERAL
+
+    If any provision of this Agreement is invalid or unenforceable under
+    applicable law, it shall not affect the validity or enforceability of
+    the remainder of the terms of this Agreement, and without further
+    action by the parties hereto, such provision shall be reformed to the
+    minimum extent necessary to make such provision valid and enforceable.
+
+    If Recipient institutes patent litigation against any entity
+    (including a cross-claim or counterclaim in a lawsuit) alleging that the
+    Program itself (excluding combinations of the Program with other software
+    or hardware) infringes such Recipient's patent(s), then such Recipient's
+    rights granted under Section 2(b) shall terminate as of the date such
+    litigation is filed.
+
+    All Recipient's rights under this Agreement shall terminate if it
+    fails to comply with any of the material terms or conditions of this
+    Agreement and does not cure such failure in a reasonable period of
+    time after becoming aware of such noncompliance. If all Recipient's
+    rights under this Agreement terminate, Recipient agrees to cease use
+    and distribution of the Program as soon as reasonably practicable.
+    However, Recipient's obligations under this Agreement and any licenses
+    granted by Recipient relating to the Program shall continue and survive.
+
+    Everyone is permitted to copy and distribute copies of this Agreement,
+    but in order to avoid inconsistency the Agreement is copyrighted and
+    may only be modified in the following manner. The Agreement Steward
+    reserves the right to publish new versions (including revisions) of
+    this Agreement from time to time. No one other than the Agreement
+    Steward has the right to modify this Agreement. The Eclipse Foundation
+    is the initial Agreement Steward. The Eclipse Foundation may assign the
+    responsibility to serve as the Agreement Steward to a suitable separate
+    entity. Each new version of the Agreement will be given a distinguishing
+    version number. The Program (including Contributions) may always be
+    Distributed subject to the version of the Agreement under which it was
+    received. In addition, after a new version of the Agreement is published,
+    Contributor may elect to Distribute the Program (including its
+    Contributions) under the new version.
+
+    Except as expressly stated in Sections 2(a) and 2(b) above, Recipient
+    receives no rights or licenses to the intellectual property of any
+    Contributor under this Agreement, whether expressly, by implication,
+    estoppel or otherwise. All rights in the Program not expressly granted
+    under this Agreement are reserved. Nothing in this Agreement is intended
+    to be enforceable by any entity that is not a Contributor or Recipient.
+    No third-party beneficiary rights are created under this Agreement.
+
+    Exhibit A - Form of Secondary Licenses Notice
+
+    "This Source Code may also be made available under the following 
+    Secondary Licenses when the conditions for such availability set forth 
+    in the Eclipse Public License, v. 2.0 are satisfied: {name license(s),
+    version(s), and exceptions or additional permissions here}."
+
+      Simply including a copy of this Agreement, including this Exhibit A
+      is not sufficient to license the Source Code under Secondary Licenses.
+
+      If it is not possible or desirable to put the notice in a particular
+      file, then You may include the notice in a location (such as a LICENSE
+      file in a relevant directory) where a recipient would be likely to
+      look for such a notice.
+
+      You may add additional accurate notices of copyright ownership.
+
+---
+
+##    The GNU General Public License (GPL) Version 2, June 1991
+
+    Copyright (C) 1989, 1991 Free Software Foundation, Inc.
+    51 Franklin Street, Fifth Floor
+    Boston, MA 02110-1335
+    USA
+
+    Everyone is permitted to copy and distribute verbatim copies
+    of this license document, but changing it is not allowed.
+
+    Preamble
+
+    The licenses for most software are designed to take away your freedom to
+    share and change it. By contrast, the GNU General Public License is
+    intended to guarantee your freedom to share and change free software--to
+    make sure the software is free for all its users. This General Public
+    License applies to most of the Free Software Foundation's software and
+    to any other program whose authors commit to using it. (Some other Free
+    Software Foundation software is covered by the GNU Library General
+    Public License instead.) You can apply it to your programs, too.
+
+    When we speak of free software, we are referring to freedom, not price.
+    Our General Public Licenses are designed to make sure that you have the
+    freedom to distribute copies of free software (and charge for this
+    service if you wish), that you receive source code or can get it if you
+    want it, that you can change the software or use pieces of it in new
+    free programs; and that you know you can do these things.
+
+    To protect your rights, we need to make restrictions that forbid anyone
+    to deny you these rights or to ask you to surrender the rights. These
+    restrictions translate to certain responsibilities for you if you
+    distribute copies of the software, or if you modify it.
+
+    For example, if you distribute copies of such a program, whether gratis
+    or for a fee, you must give the recipients all the rights that you have.
+    You must make sure that they, too, receive or can get the source code.
+    And you must show them these terms so they know their rights.
+
+    We protect your rights with two steps: (1) copyright the software, and
+    (2) offer you this license which gives you legal permission to copy,
+    distribute and/or modify the software.
+
+    Also, for each author's protection and ours, we want to make certain
+    that everyone understands that there is no warranty for this free
+    software. If the software is modified by someone else and passed on, we
+    want its recipients to know that what they have is not the original, so
+    that any problems introduced by others will not reflect on the original
+    authors' reputations.
+
+    Finally, any free program is threatened constantly by software patents.
+    We wish to avoid the danger that redistributors of a free program will
+    individually obtain patent licenses, in effect making the program
+    proprietary. To prevent this, we have made it clear that any patent must
+    be licensed for everyone's free use or not licensed at all.
+
+    The precise terms and conditions for copying, distribution and
+    modification follow.
+
+    TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+    0. This License applies to any program or other work which contains a
+    notice placed by the copyright holder saying it may be distributed under
+    the terms of this General Public License. The "Program", below, refers
+    to any such program or work, and a "work based on the Program" means
+    either the Program or any derivative work under copyright law: that is
+    to say, a work containing the Program or a portion of it, either
+    verbatim or with modifications and/or translated into another language.
+    (Hereinafter, translation is included without limitation in the term
+    "modification".) Each licensee is addressed as "you".
+
+    Activities other than copying, distribution and modification are not
+    covered by this License; they are outside its scope. The act of running
+    the Program is not restricted, and the output from the Program is
+    covered only if its contents constitute a work based on the Program
+    (independent of having been made by running the Program). Whether that
+    is true depends on what the Program does.
+
+    1. You may copy and distribute verbatim copies of the Program's source
+    code as you receive it, in any medium, provided that you conspicuously
+    and appropriately publish on each copy an appropriate copyright notice
+    and disclaimer of warranty; keep intact all the notices that refer to
+    this License and to the absence of any warranty; and give any other
+    recipients of the Program a copy of this License along with the Program.
+
+    You may charge a fee for the physical act of transferring a copy, and
+    you may at your option offer warranty protection in exchange for a fee.
+
+    2. You may modify your copy or copies of the Program or any portion of
+    it, thus forming a work based on the Program, and copy and distribute
+    such modifications or work under the terms of Section 1 above, provided
+    that you also meet all of these conditions:
+
+        a) You must cause the modified files to carry prominent notices
+        stating that you changed the files and the date of any change.
+
+        b) You must cause any work that you distribute or publish, that in
+        whole or in part contains or is derived from the Program or any part
+        thereof, to be licensed as a whole at no charge to all third parties
+        under the terms of this License.
+
+        c) If the modified program normally reads commands interactively
+        when run, you must cause it, when started running for such
+        interactive use in the most ordinary way, to print or display an
+        announcement including an appropriate copyright notice and a notice
+        that there is no warranty (or else, saying that you provide a
+        warranty) and that users may redistribute the program under these
+        conditions, and telling the user how to view a copy of this License.
+        (Exception: if the Program itself is interactive but does not
+        normally print such an announcement, your work based on the Program
+        is not required to print an announcement.)
+
+    These requirements apply to the modified work as a whole. If
+    identifiable sections of that work are not derived from the Program, and
+    can be reasonably considered independent and separate works in
+    themselves, then this License, and its terms, do not apply to those
+    sections when you distribute them as separate works. But when you
+    distribute the same sections as part of a whole which is a work based on
+    the Program, the distribution of the whole must be on the terms of this
+    License, whose permissions for other licensees extend to the entire
+    whole, and thus to each and every part regardless of who wrote it.
+
+    Thus, it is not the intent of this section to claim rights or contest
+    your rights to work written entirely by you; rather, the intent is to
+    exercise the right to control the distribution of derivative or
+    collective works based on the Program.
+
+    In addition, mere aggregation of another work not based on the Program
+    with the Program (or with a work based on the Program) on a volume of a
+    storage or distribution medium does not bring the other work under the
+    scope of this License.
+
+    3. You may copy and distribute the Program (or a work based on it,
+    under Section 2) in object code or executable form under the terms of
+    Sections 1 and 2 above provided that you also do one of the following:
+
+        a) Accompany it with the complete corresponding machine-readable
+        source code, which must be distributed under the terms of Sections 1
+        and 2 above on a medium customarily used for software interchange; or,
+
+        b) Accompany it with a written offer, valid for at least three
+        years, to give any third party, for a charge no more than your cost
+        of physically performing source distribution, a complete
+        machine-readable copy of the corresponding source code, to be
+        distributed under the terms of Sections 1 and 2 above on a medium
+        customarily used for software interchange; or,
+
+        c) Accompany it with the information you received as to the offer to
+        distribute corresponding source code. (This alternative is allowed
+        only for noncommercial distribution and only if you received the
+        program in object code or executable form with such an offer, in
+        accord with Subsection b above.)
+
+    The source code for a work means the preferred form of the work for
+    making modifications to it. For an executable work, complete source code
+    means all the source code for all modules it contains, plus any
+    associated interface definition files, plus the scripts used to control
+    compilation and installation of the executable. However, as a special
+    exception, the source code distributed need not include anything that is
+    normally distributed (in either source or binary form) with the major
+    components (compiler, kernel, and so on) of the operating system on
+    which the executable runs, unless that component itself accompanies the
+    executable.
+
+    If distribution of executable or object code is made by offering access
+    to copy from a designated place, then offering equivalent access to copy
+    the source code from the same place counts as distribution of the source
+    code, even though third parties are not compelled to copy the source
+    along with the object code.
+
+    4. You may not copy, modify, sublicense, or distribute the Program
+    except as expressly provided under this License. Any attempt otherwise
+    to copy, modify, sublicense or distribute the Program is void, and will
+    automatically terminate your rights under this License. However, parties
+    who have received copies, or rights, from you under this License will
+    not have their licenses terminated so long as such parties remain in
+    full compliance.
+
+    5. You are not required to accept this License, since you have not
+    signed it. However, nothing else grants you permission to modify or
+    distribute the Program or its derivative works. These actions are
+    prohibited by law if you do not accept this License. Therefore, by
+    modifying or distributing the Program (or any work based on the
+    Program), you indicate your acceptance of this License to do so, and all
+    its terms and conditions for copying, distributing or modifying the
+    Program or works based on it.
+
+    6. Each time you redistribute the Program (or any work based on the
+    Program), the recipient automatically receives a license from the
+    original licensor to copy, distribute or modify the Program subject to
+    these terms and conditions. You may not impose any further restrictions
+    on the recipients' exercise of the rights granted herein. You are not
+    responsible for enforcing compliance by third parties to this License.
+
+    7. If, as a consequence of a court judgment or allegation of patent
+    infringement or for any other reason (not limited to patent issues),
+    conditions are imposed on you (whether by court order, agreement or
+    otherwise) that contradict the conditions of this License, they do not
+    excuse you from the conditions of this License. If you cannot distribute
+    so as to satisfy simultaneously your obligations under this License and
+    any other pertinent obligations, then as a consequence you may not
+    distribute the Program at all. For example, if a patent license would
+    not permit royalty-free redistribution of the Program by all those who
+    receive copies directly or indirectly through you, then the only way you
+    could satisfy both it and this License would be to refrain entirely from
+    distribution of the Program.
+
+    If any portion of this section is held invalid or unenforceable under
+    any particular circumstance, the balance of the section is intended to
+    apply and the section as a whole is intended to apply in other
+    circumstances.
+
+    It is not the purpose of this section to induce you to infringe any
+    patents or other property right claims or to contest validity of any
+    such claims; this section has the sole purpose of protecting the
+    integrity of the free software distribution system, which is implemented
+    by public license practices. Many people have made generous
+    contributions to the wide range of software distributed through that
+    system in reliance on consistent application of that system; it is up to
+    the author/donor to decide if he or she is willing to distribute
+    software through any other system and a licensee cannot impose that choice.
+
+    This section is intended to make thoroughly clear what is believed to be
+    a consequence of the rest of this License.
+
+    8. If the distribution and/or use of the Program is restricted in
+    certain countries either by patents or by copyrighted interfaces, the
+    original copyright holder who places the Program under this License may
+    add an explicit geographical distribution limitation excluding those
+    countries, so that distribution is permitted only in or among countries
+    not thus excluded. In such case, this License incorporates the
+    limitation as if written in the body of this License.
+
+    9. The Free Software Foundation may publish revised and/or new
+    versions of the General Public License from time to time. Such new
+    versions will be similar in spirit to the present version, but may
+    differ in detail to address new problems or concerns.
+
+    Each version is given a distinguishing version number. If the Program
+    specifies a version number of this License which applies to it and "any
+    later version", you have the option of following the terms and
+    conditions either of that version or of any later version published by
+    the Free Software Foundation. If the Program does not specify a version
+    number of this License, you may choose any version ever published by the
+    Free Software Foundation.
+
+    10. If you wish to incorporate parts of the Program into other free
+    programs whose distribution conditions are different, write to the
+    author to ask for permission. For software which is copyrighted by the
+    Free Software Foundation, write to the Free Software Foundation; we
+    sometimes make exceptions for this. Our decision will be guided by the
+    two goals of preserving the free status of all derivatives of our free
+    software and of promoting the sharing and reuse of software generally.
+
+    NO WARRANTY
+
+    11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO
+    WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW.
+    EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR
+    OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND,
+    EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
+    ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH
+    YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL
+    NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+    12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN
+    WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY
+    AND/OR REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR
+    DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL
+    DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE PROGRAM
+    (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED
+    INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF
+    THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), EVEN IF SUCH HOLDER OR
+    OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
+
+    END OF TERMS AND CONDITIONS
+
+    How to Apply These Terms to Your New Programs
+
+    If you develop a new program, and you want it to be of the greatest
+    possible use to the public, the best way to achieve this is to make it
+    free software which everyone can redistribute and change under these terms.
+
+    To do so, attach the following notices to the program. It is safest to
+    attach them to the start of each source file to most effectively convey
+    the exclusion of warranty; and each file should have at least the
+    "copyright" line and a pointer to where the full notice is found.
+
+        One line to give the program's name and a brief idea of what it does.
+        Copyright (C) <year> <name of author>
+
+        This program is free software; you can redistribute it and/or modify
+        it under the terms of the GNU General Public License as published by
+        the Free Software Foundation; either version 2 of the License, or
+        (at your option) any later version.
+
+        This program is distributed in the hope that it will be useful, but
+        WITHOUT ANY WARRANTY; without even the implied warranty of
+        MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+        General Public License for more details.
+
+        You should have received a copy of the GNU General Public License
+        along with this program; if not, write to the Free Software
+        Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
+
+    Also add information on how to contact you by electronic and paper mail.
+
+    If the program is interactive, make it output a short notice like this
+    when it starts in an interactive mode:
+
+        Gnomovision version 69, Copyright (C) year name of author
+        Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type
+        `show w'. This is free software, and you are welcome to redistribute
+        it under certain conditions; type `show c' for details.
+
+    The hypothetical commands `show w' and `show c' should show the
+    appropriate parts of the General Public License. Of course, the commands
+    you use may be called something other than `show w' and `show c'; they
+    could even be mouse-clicks or menu items--whatever suits your program.
+
+    You should also get your employer (if you work as a programmer) or your
+    school, if any, to sign a "copyright disclaimer" for the program, if
+    necessary. Here is a sample; alter the names:
+
+        Yoyodyne, Inc., hereby disclaims all copyright interest in the
+        program `Gnomovision' (which makes passes at compilers) written by
+        James Hacker.
+
+        signature of Ty Coon, 1 April 1989
+        Ty Coon, President of Vice
+
+    This General Public License does not permit incorporating your program
+    into proprietary programs. If your program is a subroutine library, you
+    may consider it more useful to permit linking proprietary applications
+    with the library. If this is what you want to do, use the GNU Library
+    General Public License instead of this License.
+
+---
+
+## CLASSPATH EXCEPTION
+
+    Linking this library statically or dynamically with other modules is
+    making a combined work based on this library.  Thus, the terms and
+    conditions of the GNU General Public License version 2 cover the whole
+    combination.
+
+    As a special exception, the copyright holders of this library give you
+    permission to link this library with independent modules to produce an
+    executable, regardless of the license terms of these independent
+    modules, and to copy and distribute the resulting executable under
+    terms of your choice, provided that you also meet, for each linked
+    independent module, the terms and conditions of the license of that
+    module.  An independent module is a module which is not derived from or
+    based on this library.  If you modify this library, you may extend this
+    exception to your version of the library, but you are not obligated to
+    do so.  If you do not wish to do so, delete this exception statement
+    from your version.
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 62d79b2d8..f83268da9 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -50,6 +50,10 @@ aws-java-sdk-core-1.12.37.jar
 aws-java-sdk-ec2-1.12.37.jar
 aws-java-sdk-kms-1.12.37.jar
 aws-java-sdk-s3-1.12.37.jar
+bcpkix-jdk15on-1.68.jar
+bcprov-ext-jdk15on-1.68.jar
+bcprov-jdk15on-1.68.jar
+bouncy-castle-bc-2.8.0-pkg.jar
 caffeine-2.8.0.jar
 caffeine-2.8.4.jar
 calcite-core-1.29.0.jar
@@ -329,7 +333,10 @@ jackson-module-jaxb-annotations-2.7.8.jar
 jackson-xc-1.9.13.jar
 jackson-xc-1.9.2.jar
 jakarta.activation-api-1.2.1.jar
+jakarta.activation-api-1.2.2.jar
+jakarta.ws.rs-api-2.1.6.jar
 jakarta.xml.bind-api-2.3.2.jar
+jakarta.xml.bind-api-2.3.3.jar
 jamon-runtime-2.4.1.jar
 janino-3.0.9.jar
 janino-3.1.6.jar
@@ -356,6 +363,7 @@ jaxb-api-2.3.1.jar
 jaxb-impl-2.2.3-1.jar
 jboss-logging-3.2.1.Final.jar
 jcip-annotations-1.0-1.jar
+jcip-annotations-1.0.jar
 jcl-over-slf4j-1.7.12.jar
 jcl-over-slf4j-1.7.16.jar
 jcl-over-slf4j-1.7.30.jar
@@ -435,6 +443,7 @@ jsp-api-2.1.jar
 jsr305-1.3.9.jar
 jsr305-2.0.1.jar
 jsr311-api-1.1.1.jar
+jul-to-slf4j-1.7.25.jar
 jvm-attach-api-1.5.jar
 kafka-clients-2.0.0.jar
 kafka-clients-2.4.1.jar
@@ -605,6 +614,9 @@ poi-ooxml-4.1.2.jar
 poi-ooxml-schemas-4.1.2.jar
 protobuf-java-2.5.0.jar
 protobuf-java-3.17.1.jar
+pulsar-client-admin-api-2.8.0.jar
+pulsar-client-all-2.8.0.jar
+pulsar-client-api-2.8.0.jar
 rank-eval-client-6.3.1.jar
 rank-eval-client-7.5.1.jar
 re2j-1.1.jar