You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/07 02:03:49 UTC

[incubator-seatunnel] branch dev updated: [doc][connector-v2] pulsar source options doc (#2128)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 59ce8a2b3 [doc][connector-v2] pulsar source options doc (#2128)
59ce8a2b3 is described below

commit 59ce8a2b321d6a5f0503eb78b7c5df43fd245782
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Jul 7 10:03:45 2022 +0800

    [doc][connector-v2] pulsar source options doc (#2128)
    
    * [doc][connector-v2] pulsar source options doc
    
    Simplify option's name and add detailed documentation
    
    * [doc][connector-v2] Standardized Pulsar document
---
 docs/en/connector-v2/source/pulsar.md              | 126 +++++++++++++++++++++
 plugin-mapping.properties                          |   1 +
 .../seatunnel/pulsar/config/SourceProperties.java  |  55 ++++-----
 .../seatunnel/pulsar/source/PulsarSource.java      |  82 +++++++-------
 4 files changed, 189 insertions(+), 75 deletions(-)

diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/pulsar.md
new file mode 100644
index 000000000..68f9da4df
--- /dev/null
+++ b/docs/en/connector-v2/source/pulsar.md
@@ -0,0 +1,126 @@
+# Apache Pulsar
+
+## Description
+
+Source connector for Apache Pulsar. It can support both off-line and real-time jobs.
+
+##  Options
+
+| name | type | required | default value |
+| --- | --- | --- | --- |
+| topic | String | No | - |
+| topic-pattern | String | No | - |
+| topic-discovery.interval | Long | No | 30000 |
+| subscription.name | String | Yes | - |
+| client.service-url | String | Yes | - |
+| admin.service-url | String | Yes | - |
+| auth.plugin-class | String | No | - |
+| auth.params | String | No | - |
+| poll.timeout | Integer | No | 100 |
+| poll.interval | Long | No | 50 |
+| poll.batch.size | Integer | No | 500 |
+| cursor.startup.mode | Enum | No | LATEST |
+| cursor.startup.timestamp | Long | No | - |
+| cursor.reset.mode | Enum | No | LATEST |
+| cursor.stop.mode | Enum | No | NEVER |
+| cursor.stop.timestamp | Long | No | - |
+
+### topic [String]
+
+Topic name(s) to read data from when the table is used as source.  It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. 
+
+**Note, only one of "topic-pattern" and "topic" can be specified for sources.**
+
+### topic-pattern [String]
+
+The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running.
+
+**Note, only one of "topic-pattern" and "topic" can be specified for sources.**
+
+### topic-discovery.interval [Long]
+
+The interval (in ms) for the Pulsar source to discover the new topic partitions. A non-positive value disables the topic partition discovery.
+
+**Note, This option only works if the 'topic-pattern' option is used.**
+
+### subscription.name [String]
+
+Specify the subscription name for this consumer. This argument is required when constructing the consumer.
+
+### client.service-url [String]
+
+Service URL provider for Pulsar service.
+To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.
+You can assign Pulsar protocol URLs to specific clusters and use the Pulsar scheme.
+
+For example, `localhost`: `pulsar://localhost:6650,localhost:6651`.
+
+### admin.service-url [String]
+
+The Pulsar service HTTP URL for the admin endpoint. 
+
+For example, `http://my-broker.example.com:8080`, or `https://my-broker.example.com:8443` for TLS.
+
+### auth.plugin-class [String]
+
+Name of the authentication plugin.
+
+### auth.params [String]
+
+Parameters for the authentication plugin.
+
+For example, `key1:val1,key2:val2`
+
+### poll.timeout [Integer]
+
+The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency.
+
+### poll.interval [Long]
+
+The interval time(in ms) when fetcing records.  A shorter time increases throughput, but also increases CPU load.
+
+### poll.batch.size [Integer]
+
+The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency.
+
+### cursor.startup.mode [Enum]
+
+Startup mode for Pulsar consumer, valid values are `'EARLIEST'`, `'LATEST'`, `'SUBSCRIPTION'`, `'TIMESTAMP'`.
+
+### cursor.startup.timestamp [String]
+
+Start from the specified epoch timestamp (in milliseconds).
+
+**Note, This option is required when the "cursor.startup.mode" option used `'TIMESTAMP'`.**
+
+### cursor.reset.mode [Enum]
+
+Cursor reset strategy for Pulsar consumer valid values are `'EARLIEST'`, `'LATEST'`.
+
+**Note, This option only works if the "cursor.startup.mode" option used `'SUBSCRIPTION'`.**
+
+### cursor.stop.mode [String]
+
+Stop mode for Pulsar consumer, valid values are `'NEVER'`, `'LATEST'`and `'TIMESTAMP'`.
+
+**Note, When `'NEVER' `is specified, it is a real-time job, and other mode are off-line jobs.**
+
+### cursor.startup.timestamp [String]
+
+Stop from the specified epoch timestamp (in milliseconds).
+
+**Note, This option is required when the "cursor.stop.mode" option used `'TIMESTAMP'`.**
+
+## Example
+
+```Jdbc {
+source {
+  Pulsar {
+  	topic = "example"
+  	subscription.name = "seatunnel"
+    client.service-url = "localhost:pulsar://localhost:6650"
+    admin.service-url = "http://my-broker.example.com:8080"
+    result_table_name = "test"
+  }
+}
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index bd38586a9..2a43975d0 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -102,3 +102,4 @@ seatunnel.sink.Clickhouse = connector-clickhouse
 seatunnel.sink.ClickhouseFile = connector-clickhouse
 seatunnel.source.Jdbc = connector-jdbc
 seatunnel.sink.Jdbc = connector-jdbc
+seatunnel.source.Pulsar = connector-pulsar
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
index e86923220..92453efee 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
@@ -19,61 +19,48 @@ package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
 
 public class SourceProperties {
 
-    // Pulsar client API config prefix.
-    public static final String CLIENT_CONFIG_PREFIX = "pulsar.client.";
-    // Pulsar admin API config prefix.
-    public static final String ADMIN_CONFIG_PREFIX = "pulsar.admin.";
-
     // --------------------------------------------------------------------------------------------
     // The configuration for ClientConfigurationData part.
-    // All the configuration listed below should have the pulsar.client prefix.
     // --------------------------------------------------------------------------------------------
 
-    public static final String PULSAR_SERVICE_URL = CLIENT_CONFIG_PREFIX + "serviceUrl";
-    public static final String PULSAR_AUTH_PLUGIN_CLASS_NAME = CLIENT_CONFIG_PREFIX + "authPluginClassName";
-    public static final String PULSAR_AUTH_PARAMS = CLIENT_CONFIG_PREFIX + "authParams";
+    public static final String CLIENT_SERVICE_URL = "client.service-url";
+    public static final String AUTH_PLUGIN_CLASS = "auth.plugin-class";
+    public static final String AUTH_PARAMS = "auth.params";
 
     // --------------------------------------------------------------------------------------------
     // The configuration for ClientConfigurationData part.
     // All the configuration listed below should have the pulsar.client prefix.
     // --------------------------------------------------------------------------------------------
 
-    public static final String PULSAR_ADMIN_URL = ADMIN_CONFIG_PREFIX + "adminUrl";
-
-    // Pulsar source connector config prefix.
-    public static final String SOURCE_CONFIG_PREFIX = "pulsar.source.";
-    // Pulsar consumer API config prefix.
-    public static final String CONSUMER_CONFIG_PREFIX = "pulsar.consumer.";
+    public static final String ADMIN_SERVICE_URL = "admin.service-url";
 
     // --------------------------------------------------------------------------------------------
     // The configuration for ConsumerConfigurationData part.
-    // All the configuration listed below should have the pulsar.consumer prefix.
     // --------------------------------------------------------------------------------------------
 
-    public static final String PULSAR_SUBSCRIPTION_NAME = CONSUMER_CONFIG_PREFIX + "subscriptionName";
-    public static final String PULSAR_SUBSCRIPTION_TYPE = CONSUMER_CONFIG_PREFIX + "subscriptionType";
-    public static final String PULSAR_SUBSCRIPTION_MODE = CONSUMER_CONFIG_PREFIX + "subscriptionMode";
+    public static final String SUBSCRIPTION_NAME = "subscription.name";
+    public static final String SUBSCRIPTION_TYPE = "subscription.type";
+    public static final String SUBSCRIPTION_MODE = "subscription.mode";
 
     // --------------------------------------------------------------------------------------------
     // The configuration for pulsar source part.
-    // All the configuration listed below should have the pulsar.source prefix.
     // --------------------------------------------------------------------------------------------
 
-    public static final String PULSAR_PARTITION_DISCOVERY_INTERVAL_MS = SOURCE_CONFIG_PREFIX + "partitionDiscoveryIntervalMs";
-    public static final String PULSAR_TOPIC = SOURCE_CONFIG_PREFIX + "topic";
-    public static final String PULSAR_TOPIC_PATTERN = SOURCE_CONFIG_PREFIX + "topic.pattern";
-    public static final String PULSAR_POLL_TIMEOUT = SOURCE_CONFIG_PREFIX + "poll.timeout";
-    public static final String PULSAR_POLL_INTERVAL = SOURCE_CONFIG_PREFIX + "poll.interval";
-    public static final String PULSAR_BATCH_SIZE = SOURCE_CONFIG_PREFIX + "batch.size";
-    public static final String PULSAR_CURSOR_START_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.mode";
-    public static final String PULSAR_CURSOR_START_RESET_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.start.reset.mode";
-    public static final String PULSAR_CURSOR_START_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.start.timestamp";
-    public static final String PULSAR_CURSOR_START_ID = SOURCE_CONFIG_PREFIX + "scan.cursor.start.id";
-    public static final String PULSAR_CURSOR_STOP_MODE = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.mode";
-    public static final String PULSAR_CURSOR_STOP_TIMESTAMP = SOURCE_CONFIG_PREFIX + "scan.cursor.stop.timestamp";
+    public static final String TOPIC_DISCOVERY_INTERVAL = "topic-discovery.interval";
+    public static final String TOPIC = "topic";
+    public static final String TOPIC_PATTERN = "topic-pattern";
+    public static final String POLL_TIMEOUT = "poll.timeout";
+    public static final String POLL_INTERVAL = "poll.interval";
+    public static final String POLL_BATCH_SIZE = "poll.batch.size";
+    public static final String CURSOR_STARTUP_MODE = "cursor.startup.mode";
+    public static final String CURSOR_RESET_MODE = "cursor.reset.mode";
+    public static final String CURSOR_STARTUP_TIMESTAMP = "cursor.startup.timestamp";
+    public static final String CURSOR_STARTUP_ID = "cursor.startup.id";
+    public static final String CURSOR_STOP_MODE = "cursor.stop.mode";
+    public static final String CURSOR_STOP_TIMESTAMP = "cursor.stop.timestamp";
 
     /**
-     * Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
+     * Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}.
      */
     public enum StartMode {
         /**
@@ -99,7 +86,7 @@ public class SourceProperties {
     }
 
     /**
-     * Startup mode for the Kafka consumer, see {@link #PULSAR_CURSOR_START_MODE}.
+     * Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}.
      */
     public enum StopMode {
         /**
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index 20028efb0..9247e1d2b 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -19,25 +19,25 @@ package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
 
 import static org.apache.seatunnel.common.PropertiesUtil.getEnum;
 import static org.apache.seatunnel.common.PropertiesUtil.setOption;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_ADMIN_URL;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_AUTH_PARAMS;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_AUTH_PLUGIN_CLASS_NAME;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_BATCH_SIZE;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_MODE;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_RESET_MODE;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_START_TIMESTAMP;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_STOP_MODE;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_CURSOR_STOP_TIMESTAMP;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_POLL_INTERVAL;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_POLL_TIMEOUT;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_SERVICE_URL;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_SUBSCRIPTION_NAME;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_TOPIC;
-import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.PULSAR_TOPIC_PATTERN;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_RESET_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode.LATEST;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StopMode.NEVER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_DISCOVERY_INTERVAL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
@@ -100,48 +100,48 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
     @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, PULSAR_SUBSCRIPTION_NAME, PULSAR_SERVICE_URL, PULSAR_ADMIN_URL);
+        CheckResult result = CheckConfigUtil.checkAllExists(config, SUBSCRIPTION_NAME, CLIENT_SERVICE_URL, ADMIN_SERVICE_URL);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
 
         // admin config
         PulsarAdminConfig.Builder adminConfigBuilder = PulsarAdminConfig.builder()
-            .adminUrl(config.getString(PULSAR_ADMIN_URL));
-        setOption(config, PULSAR_AUTH_PLUGIN_CLASS_NAME, config::getString, adminConfigBuilder::authPluginClassName);
-        setOption(config, PULSAR_AUTH_PARAMS, config::getString, adminConfigBuilder::authParams);
+            .adminUrl(config.getString(ADMIN_SERVICE_URL));
+        setOption(config, AUTH_PLUGIN_CLASS, config::getString, adminConfigBuilder::authPluginClassName);
+        setOption(config, AUTH_PARAMS, config::getString, adminConfigBuilder::authParams);
         this.adminConfig = adminConfigBuilder.build();
 
         // client config
         PulsarClientConfig.Builder clientConfigBuilder = PulsarClientConfig.builder()
-            .serviceUrl(config.getString(PULSAR_SERVICE_URL));
-        setOption(config, PULSAR_AUTH_PLUGIN_CLASS_NAME, config::getString, clientConfigBuilder::authPluginClassName);
-        setOption(config, PULSAR_AUTH_PARAMS, config::getString, clientConfigBuilder::authParams);
+            .serviceUrl(config.getString(CLIENT_SERVICE_URL));
+        setOption(config, AUTH_PLUGIN_CLASS, config::getString, clientConfigBuilder::authPluginClassName);
+        setOption(config, AUTH_PARAMS, config::getString, clientConfigBuilder::authParams);
         this.clientConfig = clientConfigBuilder.build();
 
         // consumer config
         PulsarConsumerConfig.Builder consumerConfigBuilder = PulsarConsumerConfig.builder()
-            .subscriptionName(config.getString(PULSAR_SERVICE_URL));
+            .subscriptionName(config.getString(SUBSCRIPTION_NAME));
         this.consumerConfig = consumerConfigBuilder.build();
 
         // source properties
         setOption(config,
-            PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
+            TOPIC_DISCOVERY_INTERVAL,
             30000L,
             config::getLong,
             v -> this.partitionDiscoveryIntervalMs = v);
         setOption(config,
-            PULSAR_POLL_TIMEOUT,
+            POLL_TIMEOUT,
             100,
             config::getInt,
             v -> this.pollTimeout = v);
         setOption(config,
-            PULSAR_POLL_INTERVAL,
+            POLL_INTERVAL,
             50L,
             config::getLong,
             v -> this.pollInterval = v);
         setOption(config,
-            PULSAR_BATCH_SIZE,
+            POLL_BATCH_SIZE,
             500,
             config::getInt,
             v -> this.batchSize = v);
@@ -159,7 +159,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
     }
 
     private void setStartCursor(Config config) {
-        StartMode startMode = getEnum(config, PULSAR_CURSOR_START_MODE, StartMode.class, LATEST);
+        StartMode startMode = getEnum(config, CURSOR_STARTUP_MODE, StartMode.class, LATEST);
         switch (startMode) {
             case EARLIEST:
                 this.startCursor = StartCursor.earliest();
@@ -169,16 +169,16 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
                 break;
             case SUBSCRIPTION:
                 SubscriptionStartCursor.CursorResetStrategy resetStrategy = getEnum(config,
-                    PULSAR_CURSOR_START_RESET_MODE,
+                    CURSOR_RESET_MODE,
                     SubscriptionStartCursor.CursorResetStrategy.class,
                     SubscriptionStartCursor.CursorResetStrategy.LATEST);
                 this.startCursor = StartCursor.subscription(resetStrategy);
                 break;
             case TIMESTAMP:
-                if (StringUtils.isBlank(config.getString(PULSAR_CURSOR_START_TIMESTAMP))) {
-                    throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", PULSAR_CURSOR_START_TIMESTAMP, PULSAR_CURSOR_START_MODE));
+                if (StringUtils.isBlank(config.getString(CURSOR_STARTUP_TIMESTAMP))) {
+                    throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STARTUP_TIMESTAMP, CURSOR_STARTUP_MODE));
                 }
-                setOption(config, PULSAR_CURSOR_START_TIMESTAMP, config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp));
+                setOption(config, CURSOR_STARTUP_TIMESTAMP, config::getLong, timestamp -> this.startCursor = StartCursor.timestamp(timestamp));
                 break;
             default:
                 throw new IllegalArgumentException(String.format("The %s mode is not supported.", startMode));
@@ -186,7 +186,7 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
     }
 
     private void setStopCursor(Config config) {
-        SourceProperties.StopMode stopMode = getEnum(config, PULSAR_CURSOR_STOP_MODE, SourceProperties.StopMode.class, NEVER);
+        SourceProperties.StopMode stopMode = getEnum(config, CURSOR_STOP_MODE, SourceProperties.StopMode.class, NEVER);
         switch (stopMode) {
             case LATEST:
                 this.stopCursor = StopCursor.latest();
@@ -195,10 +195,10 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
                 this.stopCursor = StopCursor.never();
                 break;
             case TIMESTAMP:
-                if (StringUtils.isBlank(config.getString(PULSAR_CURSOR_STOP_TIMESTAMP))) {
-                    throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", PULSAR_CURSOR_STOP_TIMESTAMP, PULSAR_CURSOR_STOP_MODE));
+                if (StringUtils.isBlank(config.getString(CURSOR_STOP_TIMESTAMP))) {
+                    throw new IllegalArgumentException(String.format("The '%s' property is required when the '%s' is 'timestamp'.", CURSOR_STOP_TIMESTAMP, CURSOR_STOP_MODE));
                 }
-                setOption(config, PULSAR_CURSOR_START_TIMESTAMP, config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp));
+                setOption(config, CURSOR_STARTUP_TIMESTAMP, config::getLong, timestamp -> this.stopCursor = StopCursor.timestamp(timestamp));
                 break;
             default:
                 throw new IllegalArgumentException(String.format("The %s mode is not supported.", stopMode));
@@ -206,19 +206,19 @@ public class PulsarSource<T> implements SeaTunnelSource<T, PulsarPartitionSplit,
     }
 
     private void setPartitionDiscoverer(Config config) {
-        String topic = config.getString(PULSAR_TOPIC);
+        String topic = config.getString(TOPIC);
         if (StringUtils.isNotBlank(topic)) {
             this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ",")));
         }
-        String topicPattern = config.getString(PULSAR_TOPIC_PATTERN);
+        String topicPattern = config.getString(TOPIC_PATTERN);
         if (StringUtils.isNotBlank(topicPattern)) {
             if (this.partitionDiscoverer != null) {
-                throw new IllegalArgumentException(String.format("The properties '%s' and '%s' is exclusive.", PULSAR_TOPIC, PULSAR_TOPIC_PATTERN));
+                throw new IllegalArgumentException(String.format("The properties '%s' and '%s' is exclusive.", TOPIC, TOPIC_PATTERN));
             }
             this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern));
         }
         if (this.partitionDiscoverer == null) {
-            throw new IllegalArgumentException(String.format("The properties '%s' or '%s' is required.", PULSAR_TOPIC, PULSAR_TOPIC_PATTERN));
+            throw new IllegalArgumentException(String.format("The properties '%s' or '%s' is required.", TOPIC, TOPIC_PATTERN));
         }
     }