You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/05/22 04:05:27 UTC

[seatunnel] branch dev updated: [Feature][Json-format] support read format for pulsar (#4111)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7d61ae93e [Feature][Json-format] support read format for pulsar (#4111)
7d61ae93e is described below

commit 7d61ae93e72709238c54127a5ad62dde5df99130
Author: Laglangyue <35...@users.noreply.github.com>
AuthorDate: Mon May 22 12:05:20 2023 +0800

    [Feature][Json-format] support read format for pulsar (#4111)
---
 docs/en/Connector-v2-release-state.md              |   2 +-
 .../connector-v2/source/{pulsar.md => Pulsar.md}   |  14 +-
 release-note.md                                    |   1 +
 .../apache/seatunnel/common/utils/JsonUtils.java   |  15 +-
 seatunnel-connectors-v2/connector-pulsar/pom.xml   |   8 +-
 .../seatunnel/pulsar/config/SourceProperties.java  |  24 ++
 .../seatunnel/pulsar/source/PulsarSource.java      |  61 +++-
 .../source/enumerator/PulsarSplitEnumerator.java   |   7 +-
 .../enumerator/PulsarSplitEnumeratorState.java     |   2 +-
 .../pulsar/source/format/PulsarCanalDecorator.java | 103 +++++++
 .../pulsar/source/reader/PulsarSourceReader.java   | 121 ++++----
 .../source/reader/PulsarSplitReaderThread.java     |   3 +
 .../pulsar/source/PulsarCanalDecoratorTest.java    |  87 ++++++
 .../{ => connector-pulsar-e2e}/pom.xml             | 100 +++---
 .../e2e/connector/pulsar/CanalToPulsarIT.java      | 341 +++++++++++++++++++++
 .../e2e/connector/pulsar/PulsarBatchIT.java        | 179 +++++++++++
 .../test/resources/batch_pulsar_to_console.conf    | 162 ++++++++++
 .../src/test/resources/cdc_canal_pulsar_to_pg.conf |  65 ++++
 .../src/test/resources/ddl/canal.sql               |  47 +++
 .../src/test/resources/mysql/server-gtids/my.cnf   |  60 ++++
 .../src/test/resources/mysql/setup.sql             |  27 ++
 .../pulsar/canal-mysql-source-config.yaml          |  26 ++
 .../test/resources/pulsar/start_canal_connector.sh |  26 ++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../json/canal/CanalJsonDeserializationSchema.java |   8 +-
 .../json/canal/CanalJsonSerDeSchemaTest.java       |   2 +-
 26 files changed, 1356 insertions(+), 136 deletions(-)

diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md
index 74a183c73..308cb010b 100644
--- a/docs/en/Connector-v2-release-state.md
+++ b/docs/en/Connector-v2-release-state.md
@@ -67,7 +67,7 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex
 | [OssFile](connector-v2/source/OssFile.md)                   | Source | Beta   | 2.2.0-beta      |
 | [Phoenix](connector-v2/sink/Phoenix.md)                     | Sink   | Beta   | 2.2.0-beta      |
 | [Phoenix](connector-v2/source/Phoenix.md)                   | Source | Beta   | 2.2.0-beta      |
-| [Pulsar](connector-v2/source/pulsar.md)                     | Source | Beta   | 2.2.0-beta      |
+| [Pulsar](connector-v2/source/Pulsar.md)                     | Source | Beta   | 2.2.0-beta      |
 | [RabbitMQ](connector-v2/sink/Rabbitmq.md)                   | Sink   | Beta   | 2.3.0           |
 | [RabbitMQ](connector-v2/source/Rabbitmq.md)                 | Source | Beta   | 2.3.0           |
 | [Redis](connector-v2/sink/Redis.md)                         | Sink   | Beta   | 2.2.0-beta      |
diff --git a/docs/en/connector-v2/source/pulsar.md b/docs/en/connector-v2/source/Pulsar.md
similarity index 91%
rename from docs/en/connector-v2/source/pulsar.md
rename to docs/en/connector-v2/source/Pulsar.md
index 3bdf8d235..9eba505e7 100644
--- a/docs/en/connector-v2/source/pulsar.md
+++ b/docs/en/connector-v2/source/Pulsar.md
@@ -37,6 +37,7 @@ Source connector for Apache Pulsar.
 | cursor.stop.timestamp    | Long    | No       | -             |
 | schema                   | config  | No       | -             |
 | common-options           |         | no       | -             |
+| format                   | String  | no       | json          |
 
 ### topic [String]
 
@@ -126,9 +127,12 @@ Stop from the specified epoch timestamp (in milliseconds).
 
 ### schema [Config]
 
-#### fields [Config]
+The structure of the data, including field names and field types.
+reference to [Schema-Feature](../../concept/schema-feature.md)
 
-the schema fields of upstream data.
+## format [String]
+
+Data format. The default format is json, reference [formats](../formats).
 
 ### common options
 
@@ -141,7 +145,7 @@ source {
   Pulsar {
   	topic = "example"
   	subscription.name = "seatunnel"
-    client.service-url = "localhost:pulsar://localhost:6650"
+    client.service-url = "pulsar://localhost:6650"
     admin.service-url = "http://my-broker.example.com:8080"
     result_table_name = "test"
   }
@@ -154,3 +158,7 @@ source {
 
 - Add Pulsar Source Connector
 
+### next version
+
+- [Feature] Add Pulsar canal-format and e2e ([4111](https://github.com/apache/incubator-seatunnel/pull/4111))
+
diff --git a/release-note.md b/release-note.md
index abdc5ac14..1ee3b4986 100644
--- a/release-note.md
+++ b/release-note.md
@@ -54,6 +54,7 @@
 - [Connector-V2] [Jdbc] add the log for sql and update some style (#4475)
 - [Connector-V2] [Jdbc] Fix the table name is not automatically obtained when multiple tables (#4514)
 - [Connector-V2] [S3 & Kafka] Delete unavailable S3 & Kafka Catalogs (#4477)
+- [Connector-V2] [Pulsar] Support Canal Format
 
 ### CI
 
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
index 15184e88e..181d03555 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -233,14 +233,15 @@ public class JsonUtils {
     }
 
     public static ObjectNode parseObject(String text) {
+        return parseObject(text.getBytes());
+    }
+
+    public static ObjectNode parseObject(byte[] content) {
         try {
-            if (text.isEmpty()) {
-                return parseObject(text, ObjectNode.class);
-            } else {
-                return (ObjectNode) OBJECT_MAPPER.readTree(text);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException("String json deserialization exception.", e);
+            return (ObjectNode) OBJECT_MAPPER.readTree(content);
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "String json deserialization exception." + new String(content), e);
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-pulsar/pom.xml b/seatunnel-connectors-v2/connector-pulsar/pom.xml
index 6fcad7f58..12bbec591 100644
--- a/seatunnel-connectors-v2/connector-pulsar/pom.xml
+++ b/seatunnel-connectors-v2/connector-pulsar/pom.xml
@@ -30,7 +30,7 @@
     <name>SeaTunnel : Connectors V2 : Pulsar</name>
 
     <properties>
-        <pulsar.version>2.8.0</pulsar.version>
+        <pulsar.version>2.11.0</pulsar.version>
         <commons-lang3.version>3.4</commons-lang3.version>
     </properties>
 
@@ -100,5 +100,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
index 21ebcaef8..413693226 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.pulsar.config;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.format.json.JsonFormatFactory;
 
 public class SourceProperties {
 
@@ -168,6 +171,27 @@ public class SourceProperties {
                     .noDefaultValue()
                     .withDescription("Stop from the specified epoch timestamp (in milliseconds)");
 
+    public static final Option<Config> SCHEMA =
+            Options.key("schema")
+                    .objectType(Config.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "The structure of the data, including field names and field types.");
+
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .defaultValue(JsonFormatFactory.IDENTIFIER)
+                    .withDescription(
+                            "Data format. The default format is json. Optional text format. The default field separator is \", \". "
+                                    + "If you customize the delimiter, add the \"field_delimiter\" option.");
+
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field_delimiter")
+                    .stringType()
+                    .defaultValue(",")
+                    .withDescription("Customize the field delimiter for data format.");
+
     /** Startup mode for the pulsar consumer, see {@link #CURSOR_STARTUP_MODE}. */
     public enum StartMode {
         /** Start from the earliest cursor possible. */
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
index d41077922..cf5972aa1 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java
@@ -28,11 +28,12 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
@@ -48,9 +49,14 @@ import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor
 import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicListDiscoverer;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.format.PulsarCanalDecorator;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
 import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.JsonFormatFactory;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+import org.apache.seatunnel.format.json.canal.CanalJsonFormatFactory;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
 
 import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
 
@@ -70,9 +76,11 @@ import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STARTUP_TIMESTAMP;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_MODE;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CURSOR_STOP_TIMESTAMP;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.FORMAT;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_BATCH_SIZE;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_INTERVAL;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.POLL_TIMEOUT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SCHEMA;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.SUBSCRIPTION_NAME;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.StartMode;
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
@@ -80,10 +88,13 @@ import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProp
 import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC_PATTERN;
 
 @AutoService(SeaTunnelSource.class)
-public class PulsarSource<T>
-        implements SeaTunnelSource<T, PulsarPartitionSplit, PulsarSplitEnumeratorState>,
+public class PulsarSource
+        implements SeaTunnelSource<SeaTunnelRow, PulsarPartitionSplit, PulsarSplitEnumeratorState>,
                 SupportParallelism {
-    private DeserializationSchema<T> deserialization;
+
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    private SeaTunnelRowType typeInfo;
 
     private PulsarAdminConfig adminConfig;
     private PulsarClientConfig clientConfig;
@@ -291,11 +302,31 @@ public class PulsarSource<T>
     }
 
     private void setDeserialization(Config config) {
-        String format = config.getString("format");
-        // TODO: format SPI
-        SeaTunnelRowType rowType = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
-        deserialization =
-                (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
+        if (config.hasPath(SCHEMA.key())) {
+            typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+            String format = FORMAT.defaultValue();
+            if (config.hasPath(FORMAT.key())) {
+                format = config.getString(FORMAT.key());
+            }
+            switch (format) {
+                case JsonFormatFactory.IDENTIFIER:
+                    deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+                    break;
+                case CanalJsonFormatFactory.IDENTIFIER:
+                    deserializationSchema =
+                            new PulsarCanalDecorator(
+                                    CanalJsonDeserializationSchema.builder(typeInfo)
+                                            .setIgnoreParseErrors(true)
+                                            .build());
+                    break;
+                default:
+                    throw new SeaTunnelJsonFormatException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+            }
+        } else {
+            typeInfo = CatalogTableUtil.buildSimpleTextSchema();
+            this.deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+        }
     }
 
     @Override
@@ -306,19 +337,19 @@ public class PulsarSource<T>
     }
 
     @Override
-    public SeaTunnelDataType<T> getProducedType() {
-        return deserialization.getProducedType();
+    public SeaTunnelRowType getProducedType() {
+        return this.typeInfo;
     }
 
     @Override
-    public SourceReader<T, PulsarPartitionSplit> createReader(SourceReader.Context readerContext)
-            throws Exception {
+    public SourceReader<SeaTunnelRow, PulsarPartitionSplit> createReader(
+            SourceReader.Context readerContext) throws Exception {
         return new PulsarSourceReader<>(
                 readerContext,
                 clientConfig,
                 consumerConfig,
                 startCursor,
-                deserialization,
+                deserializationSchema,
                 pollTimeout,
                 pollInterval,
                 batchSize);
@@ -352,6 +383,6 @@ public class PulsarSource<T>
                 startCursor,
                 stopCursor,
                 consumerConfig.getSubscriptionName(),
-                checkpointState.assignedPartitions());
+                checkpointState.getAssignedPartitions());
     }
 }
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
index bc570513f..ddf6cb2d6 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
@@ -189,11 +189,12 @@ public class PulsarSplitEnumerator
     }
 
     private Set<TopicPartition> getNewPartitions(Set<TopicPartition> fetchedPartitions) {
-        Consumer<TopicPartition> dedupOrMarkAsRemoved = fetchedPartitions::remove;
-        assignedPartitions.forEach(dedupOrMarkAsRemoved);
+        Consumer<TopicPartition> duplicateOrMarkAsRemoved = fetchedPartitions::remove;
+        assignedPartitions.forEach(duplicateOrMarkAsRemoved);
         pendingPartitionSplits.forEach(
                 (reader, splits) ->
-                        splits.forEach(split -> dedupOrMarkAsRemoved.accept(split.getPartition())));
+                        splits.forEach(
+                                split -> duplicateOrMarkAsRemoved.accept(split.getPartition())));
 
         if (!fetchedPartitions.isEmpty()) {
             LOG.info("Discovered new partitions: {}", fetchedPartitions);
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
index 8df46e7b4..b000da9fb 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumeratorState.java
@@ -30,7 +30,7 @@ public class PulsarSplitEnumeratorState implements Serializable {
         this.assignedPartitions = assignedPartitions;
     }
 
-    public Set<TopicPartition> assignedPartitions() {
+    public Set<TopicPartition> getAssignedPartitions() {
         return assignedPartitions;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/format/PulsarCanalDecorator.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/format/PulsarCanalDecorator.java
new file mode 100644
index 000000000..d6cc201a4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/format/PulsarCanalDecorator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source.format;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * for pulsar-connector, the data format is
+ *
+ * <p>{ "id":0, "message":"[{pulsar-data based on canal}]", "timestamp":"" }
+ */
+public class PulsarCanalDecorator implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final String MESSAGE = "message";
+    private static final String FIELD_DATA = "data";
+    private static final String FIELD_OLD = "old";
+    public static final String COLUMN_NAME = "columnName";
+    public static final String COLUMN_VALUE = "columnValue";
+    public static final String COLUMN_INDEX = "index";
+
+    private final CanalJsonDeserializationSchema canalJsonDeserializationSchema;
+
+    public PulsarCanalDecorator(CanalJsonDeserializationSchema canalJsonDeserializationSchema) {
+        this.canalJsonDeserializationSchema = canalJsonDeserializationSchema;
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
+        ObjectNode pulsarCanal = JsonUtils.parseObject(message);
+        ArrayNode canalList = JsonUtils.parseArray(pulsarCanal.get(MESSAGE).asText());
+        Iterator<JsonNode> canalIterator = canalList.elements();
+        while (canalIterator.hasNext()) {
+            JsonNode next = canalIterator.next();
+            // reconvert pulsar handler, reference to
+            // https://github.com/apache/pulsar/blob/master/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java
+            ObjectNode root = reconvertPulsarData((ObjectNode) next);
+            canalJsonDeserializationSchema.deserialize(root, out);
+        }
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return canalJsonDeserializationSchema.getProducedType();
+    }
+
+    private ObjectNode reconvertPulsarData(ObjectNode root) {
+        root.replace(FIELD_DATA, reconvert(root.get(FIELD_DATA)));
+        root.replace(FIELD_OLD, reconvert(root.get(FIELD_OLD)));
+        return root;
+    }
+
+    private JsonNode reconvert(JsonNode node) {
+        if (!(node instanceof ArrayNode) || node.size() <= 0) {
+            return node;
+        }
+        long firstColumn = node.get(0).get(COLUMN_INDEX).asLong();
+        ArrayNode arrayNode = JsonUtils.createArrayNode();
+        ObjectNode rowMap = JsonUtils.createObjectNode();
+        for (int i = 0; i < node.size(); i++) {
+            ObjectNode columnNode = (ObjectNode) node.get(i);
+            if (firstColumn == columnNode.get(COLUMN_INDEX).asLong()) {
+                arrayNode.add(rowMap);
+                rowMap = JsonUtils.createObjectNode();
+            }
+            rowMap.set(columnNode.get(COLUMN_NAME).asText(), columnNode.get(COLUMN_VALUE));
+        }
+        arrayNode.add(rowMap);
+        arrayNode.remove(0);
+        return arrayNode;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
index c008d4592..1ff0ddfb9 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSourceReader.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;
 
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.common.Handover;
@@ -111,19 +112,16 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
         if (pulsarClient != null) {
             pulsarClient.close();
         }
-        splitReaders
-                .values()
-                .forEach(
-                        reader -> {
-                            try {
-                                reader.close();
-                            } catch (IOException e) {
-                                throw new PulsarConnectorException(
-                                        CommonErrorCode.READER_OPERATION_FAILED,
-                                        "Failed to close the split reader thread.",
-                                        e);
-                            }
-                        });
+        for (PulsarSplitReaderThread pulsarSplitReaderThread : splitReaders.values()) {
+            try {
+                pulsarSplitReaderThread.close();
+            } catch (IOException e) {
+                throw new PulsarConnectorException(
+                        CommonErrorCode.READER_OPERATION_FAILED,
+                        "Failed to close the split reader thread.",
+                        e);
+            }
+        }
     }
 
     @Override
@@ -167,25 +165,23 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
 
     @Override
     public void addSplits(List<PulsarPartitionSplit> splits) {
-        splits.forEach(
-                split -> {
-                    splitStates.put(split.splitId(), split);
-                    PulsarSplitReaderThread splitReaderThread =
-                            createPulsarSplitReaderThread(split);
-                    try {
-                        splitReaderThread.setName(
-                                "Pulsar Source Data Consumer "
-                                        + split.getPartition().getPartition());
-                        splitReaderThread.open();
-                        splitReaders.put(split.splitId(), splitReaderThread);
-                        splitReaderThread.start();
-                    } catch (PulsarClientException e) {
-                        throw new PulsarConnectorException(
-                                CommonErrorCode.READER_OPERATION_FAILED,
-                                "Failed to start the split reader thread.",
-                                e);
-                    }
-                });
+        for (PulsarPartitionSplit split : splits) {
+            splitStates.put(split.splitId(), split);
+            PulsarSplitReaderThread splitReaderThread = createPulsarSplitReaderThread(split);
+            try {
+                splitReaderThread.setName(
+                        "Pulsar Source Data Consumer " + split.getPartition().getPartition());
+                splitReaderThread.open();
+                splitReaders.put(split.splitId(), splitReaderThread);
+                splitReaderThread.start();
+                LOG.info("PulsarSplitReaderThread = {} start", splitReaderThread.getName());
+            } catch (PulsarClientException e) {
+                throw new PulsarConnectorException(
+                        CommonErrorCode.READER_OPERATION_FAILED,
+                        "Failed to start the split reader thread.",
+                        e);
+            }
+        }
     }
 
     protected PulsarSplitReaderThread createPulsarSplitReaderThread(PulsarPartitionSplit split) {
@@ -203,6 +199,10 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
     public void handleNoMoreElements(String splitId, MessageId messageId) {
         LOG.info("Reader received the split {} NoMoreElements event.", splitId);
         pendingCursorsToFinish.put(splitId, messageId);
+        // BOUNDED not trigger snapshot and notifyCheckpointComplete
+        if (context.getBoundedness() == Boundedness.BOUNDED) {
+            finishedSplits.add(splitId);
+        }
     }
 
     @Override
@@ -221,32 +221,35 @@ public class PulsarSourceReader<T> implements SourceReader<T, PulsarPartitionSpl
                     checkpointId);
             return;
         }
-        pendingCursors.forEach(
-                (splitId, messageId) -> {
-                    if (finishedSplits.contains(splitId)) {
-                        return;
-                    }
-                    try {
-                        splitReaders.get(splitId).committingCursor(messageId);
-
-                        if (pendingCursorsToFinish.containsKey(splitId)
-                                && pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
-                            finishedSplits.add(splitId);
-                            try {
-                                splitReaders.get(splitId).close();
-                            } catch (IOException e) {
-                                throw new PulsarConnectorException(
-                                        CommonErrorCode.READER_OPERATION_FAILED,
-                                        "Failed to close the split reader thread.",
-                                        e);
-                            }
-                        }
-                    } catch (PulsarClientException e) {
-                        throw new PulsarConnectorException(
-                                PulsarConnectorErrorCode.ACK_CUMULATE_FAILED,
-                                "pulsar consumer acknowledgeCumulative failed.",
-                                e);
-                    }
-                });
+        pendingCursors.forEach(this::committingCursor);
+    }
+
+    /** commit the cursor of consumer thread */
+    private void committingCursor(String splitId, MessageId messageId) {
+        if (finishedSplits.contains(splitId)) {
+            return;
+        }
+        try {
+            PulsarSplitReaderThread pulsarSplitReaderThread = splitReaders.get(splitId);
+            pulsarSplitReaderThread.committingCursor(messageId);
+
+            if (pendingCursorsToFinish.containsKey(splitId)
+                    && pendingCursorsToFinish.get(splitId).compareTo(messageId) == 0) {
+                finishedSplits.add(splitId);
+                try {
+                    pulsarSplitReaderThread.close();
+                } catch (IOException e) {
+                    throw new PulsarConnectorException(
+                            CommonErrorCode.READER_OPERATION_FAILED,
+                            "Failed to close the split reader thread.",
+                            e);
+                }
+            }
+        } catch (PulsarClientException e) {
+            throw new PulsarConnectorException(
+                    PulsarConnectorErrorCode.ACK_CUMULATE_FAILED,
+                    "pulsar consumer acknowledgeCumulative failed.",
+                    e);
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
index 7dfd0d595..301d62d96 100644
--- a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
+++ b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.java
@@ -100,6 +100,7 @@ public class PulsarSplitReaderThread extends Thread implements Closeable {
                 Thread.sleep(pollInterval);
             }
         } catch (Throwable t) {
+            LOG.error("Pulsar Consumer receive data error", t);
             handover.reportError(t);
         } finally {
             // make sure the PulsarConsumer is closed
@@ -107,6 +108,8 @@ public class PulsarSplitReaderThread extends Thread implements Closeable {
                 consumer.close();
             } catch (Throwable t) {
                 LOG.warn("Error while closing pulsar consumer", t);
+            } finally {
+                running = false;
             }
         }
     }
diff --git a/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
new file mode 100644
index 000000000..0076bbbdb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-pulsar/src/test/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarCanalDecoratorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.source.format.PulsarCanalDecorator;
+import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.Getter;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+public class PulsarCanalDecoratorTest {
+    private static final String json =
+            "{"
+                    + "  \"id\": 3,\n"
+                    + "  \"message\": \"[{\\\"data\\\":[{\\\"isKey\\\":\\\"1\\\",\\\"isNull\\\":\\\"0\\\",\\\"index\\\":\\\"0\\\",\\\"mysqlType\\\":\\\"INTEGER\\\",\\\"columnName\\\":\\\"id\\\",\\\"columnValue\\\":\\\"109\\\",\\\"updated\\\":\\\"0\\\"},{\\\"isKey\\\":\\\"0\\\",\\\"isNull\\\":\\\"0\\\",\\\"index\\\":\\\"1\\\",\\\"mysqlType\\\":\\\"VARCHAR(255)\\\",\\\"columnName\\\":\\\"name\\\",\\\"columnValue\\\":\\\"spare tire\\\",\\\"updated\\\":\\\"0\\\"},{\\\"isKey\\\":\\\"0\\\",\\\ [...]
+                    + "  \"timestamp\": \"2023-04-02 05:06:58\""
+                    + "}";
+
+    @Test
+    void decoder() throws IOException {
+        String[] fieldNames = new String[] {"id", "name", "description", "weight"};
+        SeaTunnelDataType<?>[] dataTypes =
+                new SeaTunnelDataType[] {
+                    BasicType.LONG_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.STRING_TYPE,
+                    BasicType.STRING_TYPE
+                };
+
+        SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, dataTypes);
+
+        CanalJsonDeserializationSchema canalJsonDeserializationSchema =
+                CanalJsonDeserializationSchema.builder(seaTunnelRowType).build();
+        PulsarCanalDecorator pulsarCanalDecorator =
+                new PulsarCanalDecorator(canalJsonDeserializationSchema);
+
+        SimpleCollector simpleCollector = new SimpleCollector();
+        pulsarCanalDecorator.deserialize(json.getBytes(StandardCharsets.UTF_8), simpleCollector);
+        Assertions.assertFalse(simpleCollector.getList().isEmpty());
+        for (SeaTunnelRow seaTunnelRow : simpleCollector.list) {
+            for (Object field : seaTunnelRow.getFields()) {
+                Assertions.assertNotNull(field);
+            }
+        }
+    }
+
+    private static class SimpleCollector implements Collector<SeaTunnelRow> {
+        @Getter private List<SeaTunnelRow> list = new ArrayList<>();
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            list.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml
similarity index 52%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml
index 498a33054..2d9e31c33 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml
@@ -18,83 +18,97 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
 
-    <artifactId>seatunnel-connector-v2-e2e</artifactId>
-    <packaging>pom</packaging>
-    <name>SeaTunnel : E2E : Connector V2 :</name>
+    <artifactId>connector-pulsar-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Pulsar</name>
 
-    <modules>
-        <module>connector-assert-e2e</module>
-        <module>connector-jdbc-e2e</module>
-        <module>connector-redis-e2e</module>
-        <module>connector-cdc-sqlserver-e2e</module>
-        <module>connector-clickhouse-e2e</module>
-        <module>connector-starrocks-e2e</module>
-        <module>connector-influxdb-e2e</module>
-        <module>connector-amazondynamodb-e2e</module>
-        <module>connector-file-local-e2e</module>
-        <module>connector-file-sftp-e2e</module>
-        <module>connector-cassandra-e2e</module>
-        <module>connector-neo4j-e2e</module>
-        <module>connector-http-e2e</module>
-        <module>connector-rabbitmq-e2e</module>
-        <module>connector-kafka-e2e</module>
-        <module>connector-doris-e2e</module>
-        <module>connector-fake-e2e</module>
-        <module>connector-elasticsearch-e2e</module>
-        <module>connector-iotdb-e2e</module>
-        <module>connector-cdc-mysql-e2e</module>
-        <module>connector-iceberg-e2e</module>
-        <module>connector-iceberg-hadoop3-e2e</module>
-        <module>connector-tdengine-e2e</module>
-        <module>connector-datahub-e2e</module>
-        <module>connector-mongodb-e2e</module>
-        <module>connector-hbase-e2e</module>
-        <module>connector-maxcompute-e2e</module>
-        <module>connector-google-firestore-e2e</module>
-        <module>connector-rocketmq-e2e</module>
-        <module>connector-paimon-e2e</module>
-    </modules>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.seatunnel</groupId>
+                <artifactId>connector-cdc-mysql</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.seatunnel</groupId>
+                <artifactId>connector-jdbc</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-e2e-common</artifactId>
+            <artifactId>connector-pulsar</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-13-starter</artifactId>
+            <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-15-starter</artifactId>
+            <artifactId>connector-fake</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-2-starter</artifactId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-3-starter</artifactId>
+            <artifactId>connector-cdc-mysql</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-starter</artifactId>
+            <artifactId>connector-cdc-mysql</artifactId>
             <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>mysql</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
new file mode 100644
index 000000000..716bd7dc9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.pulsar;
+
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+/**
+ * canal server producer data to pulsar, st-cdc is consumer reference:
+ * https://pulsar.apache.org/docs/2.11.x/io-canal-source/
+ */
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK},
+        disabledReason = "spark would ignore delete type")
+public class CanalToPulsarIT extends TestSuiteBase implements TestResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CanalToPulsarIT.class);
+
+    // ----------------------------------------------------------------------------
+    // mysql
+    private static final String MYSQL_HOST = "mysql.e2e";
+
+    private static final int MYSQL_PORT = 3306;
+    public static final String MYSQL_USER = "st_user";
+    public static final String MYSQL_PASSWORD = "seatunnel";
+
+    private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);
+
+    private final UniqueDatabase inventoryDatabase =
+            new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", "mysqlpw");
+
+    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+        MySqlContainer mySqlContainer =
+                new MySqlContainer(version)
+                        .withConfigurationOverride("mysql/server-gtids/my.cnf")
+                        .withSetupSQL("mysql/setup.sql")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(MYSQL_HOST)
+                        .withDatabaseName("canal")
+                        .withUsername(MYSQL_USER)
+                        .withPassword(MYSQL_PASSWORD)
+                        .withLogConsumer(new Slf4jLogConsumer(LOG));
+        mySqlContainer.withExposedPorts(MYSQL_PORT);
+        return mySqlContainer;
+    }
+
+    // ----------------------------------------------------------------------------
+    // postgres
+    private static final String PG_IMAGE = "postgres:alpine3.16";
+
+    private static final String PG_DRIVER_JAR =
+            "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
+
+    private static PostgreSQLContainer<?> POSTGRESQL_CONTAINER;
+
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory =
+            container -> {
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "bash",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+                                        + PG_DRIVER_JAR);
+
+                Assertions.assertEquals(0, extraCommands.getExitCode());
+            };
+
+    private void createPostgreSQLContainer() throws ClassNotFoundException {
+        POSTGRESQL_CONTAINER =
+                new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases("postgresql")
+                        .withExposedPorts(5432)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PG_IMAGE)));
+    }
+
+    private void initializeJdbcTable() {
+        try (Connection connection =
+                DriverManager.getConnection(
+                        POSTGRESQL_CONTAINER.getJdbcUrl(),
+                        POSTGRESQL_CONTAINER.getUsername(),
+                        POSTGRESQL_CONTAINER.getPassword())) {
+            Statement statement = connection.createStatement();
+            String sink =
+                    "create table sink(\n"
+                            + "id INT NOT NULL PRIMARY KEY,\n"
+                            + "name varchar(255),\n"
+                            + "description varchar(255),\n"
+                            + "weight varchar(255)"
+                            + ")";
+            statement.execute(sink);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing PostgreSql table failed!", e);
+        }
+    }
+
+    // ----------------------------------------------------------------------------
+    // canal
+    private static GenericContainer<?> CANAL_CONTAINER;
+
+    private static final String CANAL_DOCKER_IMAGE = "canal/canal-server:v1.1.2";
+
+    private static final String CANAL_HOST = "canal.e2e";
+
+    private void createCanalContainer() {
+        CANAL_CONTAINER =
+                new GenericContainer<>(CANAL_DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(CANAL_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(CANAL_DOCKER_IMAGE)));
+
+        CANAL_CONTAINER
+                .withEnv("canal.auto.scan", "false")
+                .withEnv("canal.destinations", "test")
+                .withEnv(
+                        "canal.instance.master.address",
+                        String.format("%s:%s", MYSQL_HOST, MYSQL_PORT))
+                .withEnv("canal.instance.dbUsername", MYSQL_USER)
+                .withEnv("canal.instance.dbPassword", MYSQL_PASSWORD)
+                .withEnv("canal.instance.connectionCharset", "UTF-8")
+                .withEnv("canal.instance.tsdb.enable", "true")
+                .withEnv("canal.instance.gtidon", "false");
+    }
+
+    // ----------------------------------------------------------------------------
+    // pulsar container
+    // download canal connector is so slowly,make it with canal connector from apache/pulsar
+    private static final String PULSAR_IMAGE_NAME = "laglangyue/pulsar_canal:2.3.1";
+
+    private static final String PULSAR_HOST = "pulsar.e2e";
+    private static final String TOPIC = "test-cdc_mds";
+
+    private static final Integer PULSAR_BROKER_PORT = 6650;
+    private static final Integer PULSAR_BROKER_HTTP_PORT = 8080;
+
+    private static GenericContainer<?> PULSAR_CONTAINER;
+
+    private void createPulsarContainer() {
+        PULSAR_CONTAINER =
+                new GenericContainer<>(DockerImageName.parse(PULSAR_IMAGE_NAME))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(PULSAR_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
+        PULSAR_CONTAINER.withExposedPorts(PULSAR_BROKER_PORT, PULSAR_BROKER_HTTP_PORT);
+
+        // canal connectors config
+        PULSAR_CONTAINER.withCopyFileToContainer(
+                MountableFile.forClasspathResource("pulsar/canal-mysql-source-config.yaml"),
+                "/pulsar/conf/");
+        // start connectors cmd
+        PULSAR_CONTAINER.withCopyFileToContainer(
+                MountableFile.forClasspathResource("pulsar/start_canal_connector.sh"), "/pulsar/");
+        // wait for pulsar started
+        List<WaitStrategy> waitStrategies = new ArrayList<>();
+        waitStrategies.add(Wait.forLogMessage(".*pulsar entered RUNNING state.*", 1));
+        waitStrategies.add(Wait.forLogMessage(".*canal entered RUNNING state.*", 1));
+        final WaitAllStrategy compoundedWaitStrategy = new WaitAllStrategy();
+        waitStrategies.forEach(compoundedWaitStrategy::withStrategy);
+        PULSAR_CONTAINER.waitingFor(compoundedWaitStrategy);
+    }
+
+    private void waitForTopicCreated() throws PulsarClientException {
+        try (PulsarAdmin pulsarAdmin =
+                PulsarAdmin.builder()
+                        .serviceHttpUrl(
+                                String.format(
+                                        "http://%s:%s",
+                                        PULSAR_CONTAINER.getHost(),
+                                        PULSAR_CONTAINER.getMappedPort(PULSAR_BROKER_HTTP_PORT)))
+                        .build()) {
+            while (true) {
+                try {
+                    List<String> topics = pulsarAdmin.topics().getList("public/default");
+                    if (topics.stream().anyMatch(t -> StringUtils.contains(t, TOPIC))) {
+                        break;
+                    }
+                    Thread.sleep(5000);
+                } catch (PulsarAdminException | InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    @BeforeAll
+    @Override
+    public void startUp() throws ClassNotFoundException, InterruptedException {
+        LOG.info("The second stage: Starting Mysql containers...");
+        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+        LOG.info("Mysql Containers are started");
+
+        LOG.info("The third stage: Starting Canal containers...");
+        createCanalContainer();
+        Startables.deepStart(Stream.of(CANAL_CONTAINER)).join();
+        LOG.info("Canal Containers are started");
+
+        LOG.info("Starting Pulsar containers...");
+        createPulsarContainer();
+        Startables.deepStart(Stream.of(PULSAR_CONTAINER)).join();
+        LOG.info("Pulsar Containers are started");
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.MINUTES)
+                .untilAsserted(this::waitForTopicCreated);
+        // before ddl, the pulsar_canal connector should be started
+        inventoryDatabase.createAndInitialize();
+        // wait pulsar get data from canal server
+        Thread.sleep(10 * 1000);
+        LOG.info("The fourth stage: Starting PostgresSQL container...");
+        createPostgreSQLContainer();
+        Startables.deepStart(Stream.of(POSTGRESQL_CONTAINER)).join();
+        Class.forName(POSTGRESQL_CONTAINER.getDriverClassName());
+        LOG.info("postgresql Containers are started");
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(5, TimeUnit.MINUTES)
+                .untilAsserted(this::initializeJdbcTable);
+    }
+
+    @Override
+    public void tearDown() {
+        MYSQL_CONTAINER.close();
+        CANAL_CONTAINER.close();
+        PULSAR_CONTAINER.close();
+    }
+
+    @TestTemplate
+    void testCanalFormatMessages(TestContainer container)
+            throws IOException, InterruptedException, SQLException {
+        Container.ExecResult execResult = container.executeJob("/cdc_canal_pulsar_to_pg.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+        List<List<Object>> actual = new ArrayList<>();
+        try (Connection connection =
+                DriverManager.getConnection(
+                        POSTGRESQL_CONTAINER.getJdbcUrl(),
+                        POSTGRESQL_CONTAINER.getUsername(),
+                        POSTGRESQL_CONTAINER.getPassword())) {
+            try (Statement statement = connection.createStatement()) {
+                ResultSet resultSet = statement.executeQuery("SELECT * FROM sink ORDER BY id");
+                while (resultSet.next()) {
+                    List<Object> row =
+                            Arrays.asList(
+                                    resultSet.getInt("id"),
+                                    resultSet.getString("name"),
+                                    resultSet.getString("description"),
+                                    resultSet.getString("weight"));
+                    actual.add(row);
+                }
+            }
+        }
+        List<List<Object>> expected =
+                Lists.newArrayList(
+                        Arrays.asList(101, "scooter", "Small 2-wheel scooter", "4.56"),
+                        Arrays.asList(102, "car battery", "12V car battery", "8.1"),
+                        Arrays.asList(
+                                103,
+                                "12-pack drill bits",
+                                "12-pack of drill bits with sizes ranging from #40 to #3",
+                                "0.8"),
+                        Arrays.asList(104, "hammer", "12oz carpenter's hammer", "0.75"),
+                        Arrays.asList(105, "hammer", "14oz carpenter's hammer", "0.875"),
+                        Arrays.asList(106, "hammer", "16oz carpenter's hammer", "1.0"),
+                        Arrays.asList(107, "rocks", "box of assorted rocks", "7.88"),
+                        Arrays.asList(108, "jacket", "water resistent black wind breaker", "0.1"));
+        Assertions.assertIterableEquals(expected, actual);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
new file mode 100644
index 000000000..b1ea69efa
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.pulsar;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeDataGenerator;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class PulsarBatchIT extends TestSuiteBase implements TestResource {
+
+    private static final String PULSAR_IMAGE_NAME = "apachepulsar/pulsar:2.3.1";
+    public static final String PULSAR_HOST = "pulsar.batch.e2e";
+    public static final String TOPIC = "topic-it";
+    private PulsarContainer pulsarContainer;
+    private PulsarClient client;
+    private Producer<byte[]> producer;
+
+    private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                    },
+                    new SeaTunnelDataType[] {
+                        new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+                        ArrayType.INT_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(38, 10),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                    });
+
+    @Override
+    @BeforeAll
+    public void startUp() throws Exception {
+        pulsarContainer =
+                new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(PULSAR_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
+
+        Startables.deepStart(Stream.of(pulsarContainer)).join();
+        Awaitility.given()
+                .ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initTopic);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        pulsarContainer.close();
+        client.close();
+        producer.close();
+    }
+
+    private void initTopic() throws PulsarClientException {
+        client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
+        producer = client.newProducer(Schema.BYTES).topic(TOPIC).create();
+        produceData();
+    }
+
+    private void produceData() {
+
+        try {
+            FakeConfig fakeConfig = FakeConfig.buildWithConfig(ConfigFactory.empty());
+            FakeDataGenerator fakeDataGenerator =
+                    new FakeDataGenerator(SEATUNNEL_ROW_TYPE, fakeConfig);
+            SimpleCollector simpleCollector = new SimpleCollector();
+            fakeDataGenerator.collectFakedRows(100, simpleCollector);
+            JsonSerializationSchema jsonSerializationSchema =
+                    new JsonSerializationSchema(SEATUNNEL_ROW_TYPE);
+            for (SeaTunnelRow seaTunnelRow : simpleCollector.getList()) {
+                producer.send(jsonSerializationSchema.serialize(seaTunnelRow));
+            }
+        } catch (PulsarClientException e) {
+            throw new RuntimeException("produce data error", e);
+        }
+    }
+
+    private static class SimpleCollector implements Collector<SeaTunnelRow> {
+        @Getter private List<SeaTunnelRow> list = new ArrayList<>();
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            list.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+
+    @TestTemplate
+    void testPulsarBatch(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/batch_pulsar_to_console.conf");
+        Assertions.assertEquals(execResult.getExitCode(), 0);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
new file mode 100644
index 000000000..59d2efc86
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
@@ -0,0 +1,162 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 5000
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Pulsar {
+    client.service-url = "pulsar://pulsar.batch.e2e:6650"
+    admin.service-url = "http://pulsar.batch.e2e:8080"
+    subscription.name = "e2e"
+    topic = "topic-it"
+    cursor.startup.mode = "EARLIEST"
+    cursor.stop.mode = "LATEST"
+    format = json
+    result_table_name = "pulsar_canal"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = c_string
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_boolean
+            field_type = boolean
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_float
+            field_type = float
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_double
+            field_type = double
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_tinyint
+            field_type = byte
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_smallint
+            field_type = short
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_int
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_bigint
+            field_type = long
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_date
+            field_type = date
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = c_timestamp
+            field_type = timestamp
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/cdc_canal_pulsar_to_pg.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/cdc_canal_pulsar_to_pg.conf
new file mode 100644
index 000000000..c287be676
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/cdc_canal_pulsar_to_pg.conf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Pulsar {
+    client.service-url = "pulsar://pulsar.e2e:6650"
+    admin.service-url = "http://pulsar.e2e:8080"
+    subscription.name = "e2e_canal_cdc_subscription"
+    topic = "test-cdc_mds"
+    cursor.startup.mode = "EARLIEST"
+    cursor.stop.mode = "LATEST"
+    format = canal_json
+    result_table_name = "pulsar_canal"
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        description = "string"
+        weight = "string"
+      }
+    }
+  }
+}
+
+sink {
+  Jdbc {
+    driver = org.postgresql.Driver
+    url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+    user = test
+    password = test
+    generate_sink_sql = true
+    database = public
+    table = sink
+    primary_keys = ["id"]
+  }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/ddl/canal.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/ddl/canal.sql
new file mode 100644
index 000000000..ed75e315b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/ddl/canal.sql
@@ -0,0 +1,47 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- ----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  canal
+-- ----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products
+(
+    id          INTEGER      NOT NULL AUTO_INCREMENT PRIMARY KEY,
+    name        VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+    description VARCHAR(512),
+    weight      VARCHAR(512)
+);
+ALTER TABLE products
+    AUTO_INCREMENT = 101;
+
+INSERT INTO products
+VALUES (default, "scooter", "Small 2-wheel scooter", "3.14"),
+       (default, "car battery", "12V car battery", "8.1"),
+       (default, "12-pack drill bits", "12-pack of drill bits with sizes ranging from #40 to #3", "0.8"),
+       (default, "hammer", "12oz carpenter's hammer", "0.75"),
+       (default, "hammer", "14oz carpenter's hammer", "0.875"),
+       (default, "hammer", "16oz carpenter's hammer", "1.0"),
+       (default, "rocks", "box of assorted rocks", "5.3"),
+       (default, "jacket", "water resistent black wind breaker", "0.1"),
+       (default, "spare tire", "24 inch spare tire", "22.2");
+
+UPDATE products SET weight = '4.56' WHERE name = 'scooter';
+UPDATE products SET weight = '7.88' WHERE name = 'rocks';
+
+DELETE FROM products WHERE name  = "spare tire";
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/server-gtids/my.cnf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/server-gtids/my.cnf
new file mode 100644
index 000000000..2a0e40ba1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/server-gtids/my.cnf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but would
+# be longer on a production system. Row-level info is required for ingest to work.
+# Server ID is required, but this will vary on production systems
+server-id         = 223344
+log_bin           = mysql-bin
+binlog_format     = row
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/setup.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/setup.sql
new file mode 100644
index 000000000..e2b73c3ec
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/mysql/setup.sql
@@ -0,0 +1,27 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'st_user' - all privileges required by the snapshot reader AND binlog reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES  ON *.* TO 'st_user'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/canal-mysql-source-config.yaml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/canal-mysql-source-config.yaml
new file mode 100644
index 000000000..af6e3f7bd
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/canal-mysql-source-config.yaml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+configs:
+  zkServers: ""
+  batchSize: "5120"
+  destination: "test"
+  username: ""
+  password: ""
+  cluster: false
+  singleHostname: "canal.e2e"
+  singlePort: "11111"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/start_canal_connector.sh b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/start_canal_connector.sh
new file mode 100644
index 000000000..de6508fbb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsar/start_canal_connector.sh
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+./bin/pulsar-admin source localrun \
+   --archive ./connectors/pulsar-io-canal-2.3.0.nar \
+   --classname org.apache.pulsar.io.canal.CanalStringSource \
+   --tenant public \
+   --namespace default \
+   --name canal \
+   --destination-topic-name test-cdc_mds \
+   --source-config-file /pulsar/conf/canal-mysql-source-config.yaml \
+   --parallelism 1
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 498a33054..070ab6e93 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -56,6 +56,7 @@
         <module>connector-maxcompute-e2e</module>
         <module>connector-google-firestore-e2e</module>
         <module>connector-rocketmq-e2e</module>
+        <module>connector-pulsar-e2e</module>
         <module>connector-paimon-e2e</module>
     </modules>
 
diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
index 948c0e91e..44cd6a9a8 100644
--- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java
@@ -120,6 +120,10 @@ public class CanalJsonDeserializationSchema implements DeserializationSchema<Sea
         }
         ObjectNode jsonNode = (ObjectNode) convertBytes(message);
         assert jsonNode != null;
+        deserialize(jsonNode, out);
+    }
+
+    public void deserialize(ObjectNode jsonNode, Collector<SeaTunnelRow> out) {
         if (database != null
                 && !databasePattern.matcher(jsonNode.get(FIELD_DATABASE).asText()).matches()) {
             return;
@@ -137,7 +141,7 @@ public class CanalJsonDeserializationSchema implements DeserializationSchema<Sea
             }
             throw new SeaTunnelJsonFormatException(
                     CommonErrorCode.JSON_OPERATION_FAILED,
-                    format("Null data value \"%s\" Cannot send downstream", new String(message)));
+                    format("Null data value \"%s\" Cannot send downstream", jsonNode));
         }
         if (OP_INSERT.equals(type)) {
             for (int i = 0; i < dataNode.size(); i++) {
@@ -179,7 +183,7 @@ public class CanalJsonDeserializationSchema implements DeserializationSchema<Sea
                         CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                         format(
                                 "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'",
-                                type, new String(message)));
+                                type, jsonNode.asText()));
             }
         }
     }
diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
index 16938c003..c7544f6db 100644
--- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
+++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/canal/CanalJsonSerDeSchemaTest.java
@@ -65,7 +65,7 @@ public class CanalJsonSerDeSchemaTest {
                 createCanalJsonDeserializationSchema(null, null);
         final SimpleCollector collector = new SimpleCollector();
 
-        deserializationSchema.deserialize(null, collector);
+        deserializationSchema.deserialize((byte[]) null, collector);
         assertEquals(0, collector.list.size());
     }