You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "lightzhao (via GitHub)" <gi...@apache.org> on 2023/01/23 22:56:16 UTC

[GitHub] [incubator-seatunnel] lightzhao opened a new pull request, #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

lightzhao opened a new pull request, #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   Improve pulsar deserialization
   1.Add format and field_delimiter configuration.
   2.Rich deserialization methods.
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1489566567

   Hello everyone, who has time to help review this pr. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1084845437


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java:
##########
@@ -226,10 +234,37 @@ private void setPartitionDiscoverer(Config config) {
     }
 
     private void setDeserialization(Config config) {
-        String format = config.getString("format");
-        // TODO: format SPI
-        SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType();
-        deserialization = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
+        String schemaKey = SeaTunnelSchema.SCHEMA.key();
+        if (config.hasPath(schemaKey)) {
+            Config schema = config.getConfig(schemaKey);
+            SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+            String format = DEFAULT_FORMAT;
+            if (config.hasPath(FORMAT.key())) {
+                format = config.getString(FORMAT.key());
+            }
+            if (DEFAULT_FORMAT.equals(format)) {
+                deserialization = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
+            } else if (TEXT_FORMAT.equals(format)) {
+                String delimiter = DEFAULT_FIELD_DELIMITER;
+                if (config.hasPath(FIELD_DELIMITER.key())) {
+                    delimiter = config.getString(FIELD_DELIMITER.key());
+                }
+                deserialization = (DeserializationSchema<T>) TextDeserializationSchema.builder()
+                        .seaTunnelRowType(rowType)
+                        .delimiter(delimiter)
+                        .build();
+            } else {
+                // TODO: use format SPI
+                throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,

Review Comment:
   Can't throw UNSUPPORTED_DATA_TYPE, CommonErrorCode.UNSUPPORTED_OPERATION maybe better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1439788103

   @Hisoka-X @ashulin @TyrantLucifer PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1537666375

   @Hisoka-X @TyrantLucifer @ashulin @hailin0 @EricJoy2048 PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1401705427

   > Please add change to change log in pulsar doc
   
   done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1488068492

   @Hisoka-X @TyrantLucifer @ashulin all checks passed , PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1198621428


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java:
##########
@@ -167,8 +174,20 @@ public class SourceProperties {
                     .longType()
                     .noDefaultValue()
                     .withDescription("Stop from the specified epoch timestamp (in milliseconds)");
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .noDefaultValue()

Review Comment:
   @lightzhao 



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>

Review Comment:
   LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1156728031


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>

Review Comment:
   This class is needed when the test is running, and this log class is missing when ci.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1085092201


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/PulsarSource.java:
##########
@@ -226,10 +234,37 @@ private void setPartitionDiscoverer(Config config) {
     }
 
     private void setDeserialization(Config config) {
-        String format = config.getString("format");
-        // TODO: format SPI
-        SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(config.getConfig(SeaTunnelSchema.SCHEMA.key())).getSeaTunnelRowType();
-        deserialization = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
+        String schemaKey = SeaTunnelSchema.SCHEMA.key();
+        if (config.hasPath(schemaKey)) {
+            Config schema = config.getConfig(schemaKey);
+            SeaTunnelRowType rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+            String format = DEFAULT_FORMAT;
+            if (config.hasPath(FORMAT.key())) {
+                format = config.getString(FORMAT.key());
+            }
+            if (DEFAULT_FORMAT.equals(format)) {
+                deserialization = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);
+            } else if (TEXT_FORMAT.equals(format)) {
+                String delimiter = DEFAULT_FIELD_DELIMITER;
+                if (config.hasPath(FIELD_DELIMITER.key())) {
+                    delimiter = config.getString(FIELD_DELIMITER.key());
+                }
+                deserialization = (DeserializationSchema<T>) TextDeserializationSchema.builder()
+                        .seaTunnelRowType(rowType)
+                        .delimiter(delimiter)
+                        .build();
+            } else {
+                // TODO: use format SPI
+                throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1484598738

   > Please fix ci
   
   Cannot perform ci check after update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1491326031

   @TyrantLucifer @Hisoka-X @ashulin  PTAL , thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1536989662

   @Hisoka-X @TyrantLucifer @ashulin @hailin0 @EricJoy2048 PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1156727112


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>

Review Comment:
   deleted.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao closed pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao closed pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization
URL: https://github.com/apache/seatunnel/pull/3990


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1156727233


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1085390498


##########
docs/en/connector-v2/source/pulsar.md:
##########
@@ -152,3 +163,4 @@ source {
 
 ### 2.3.0-beta 2022-10-20
 - Add Pulsar Source Connector
+- Improve pulsar deserialization([3990](https://github.com/apache/incubator-seatunnel/pull/3990))

Review Comment:
   It should in next version
   ```suggestion
   ### Next Version
   - Improve pulsar deserialization([3990](https://github.com/apache/incubator-seatunnel/pull/3990))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1452794091

   @ashulin PTAl.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1536983169

   @Hisoka-X @TyrantLucifer @hailin0 please approve ci. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1537029208

   > Please ensure that your pull request's CI/CD process passes. Thank you.
   
   done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/seatunnel/pull/3990#discussion_r1203265191


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java:
##########
@@ -167,8 +174,20 @@ public class SourceProperties {
                     .longType()
                     .noDefaultValue()
                     .withDescription("Stop from the specified epoch timestamp (in milliseconds)");
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .noDefaultValue()

Review Comment:
   I've been a little busy recently, I'll update it in the next few days.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1510791148

   @EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1488039523

   @Hisoka-X please approve ci check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1486206483

   please approve ci check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1445989193

   @EricJoy2048 @Hisoka-X @ashulin @TyrantLucifer PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1401440221

   Please add change to change log in plusar doc


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1498371206

    @EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin Please approve CI check,thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1160565401


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>

Review Comment:
   @hailin0 PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1539285704

   Hi all ,Who has time to review , it's been a long time, #4382 pulsar sink is waiting for this pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1159349684


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>

Review Comment:
   cc @hailin0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1504432431

   @EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin PTAL,thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/seatunnel/pull/3990#issuecomment-1560336017

   #4111 This ability has been merged, I will close this pr.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1533957565

   please approve ci. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1433019705

   Hi, please fix code style.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1512375880

   Is there a problem with e2e? 
   <img width="968" alt="image" src="https://user-images.githubusercontent.com/40714172/232661831-46fd1d72-1361-44e1-b6a9-7f88bf028877.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1498534876

   > Check did not find a failure, why is it prompted 'Some checks were not successful', can you retry the check.
   
   Some check timout and be canceled. Never mind, ci already pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1498530898

   Check did not find a failure, why is it prompted 'Some checks were not successful', can you retry the check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1189485369


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java:
##########
@@ -167,8 +174,20 @@ public class SourceProperties {
                     .longType()
                     .noDefaultValue()
                     .withDescription("Stop from the specified epoch timestamp (in milliseconds)");
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .noDefaultValue()

Review Comment:
   No defaule value? Or the default value is `json`? I suggest define the default value in option.  `public static final String DEFAULT_FORMAT = "json";` is not a good way.



##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/config/SourceProperties.java:
##########
@@ -167,8 +174,20 @@ public class SourceProperties {
                     .longType()
                     .noDefaultValue()
                     .withDescription("Stop from the specified epoch timestamp (in milliseconds)");
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Data format. The default format is json. Optional text format. The default field separator is \", \". "
+                                    + "If you customize the delimiter, add the \"field_delimiter\" option.");
+
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field_delimiter")
+                    .stringType()
+                    .noDefaultValue()

Review Comment:
   same as above.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/pulsarsource_text_to_console.conf:
##########
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    execution.parallelism = 1
+    job.mode = "STREAMING"
+}
+
+source {
+  Pulsar {
+    topic = "persistent://public/default/test"
+    topic-pattern = ""
+    subscription.name = "seatunnel_pulsar_sub"
+    client.service-url = "pulsar://localhost:6650"
+    admin.service-url = "http://localhost:8080"
+    result_table_name = "pulsar_table"
+    schema = {
+      fields {
+           id = bigint
+           c_string = string
+      }
+    }
+    format = text
+    # The default field delimiter is ","
+    field_delimiter = ","
+  }
+}
+
+transform {
+}
+
+sink {
+  Console {}

Review Comment:
   I think you need check the data read from the source. You can add a LocalFile Sink Or Pulsar Sink and then read the data from the target file(or topic when you use Pulsar Sink). Check every row and column.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1478872237

   Please fix ci


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1156697058


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>

Review Comment:
   Why add this?



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>

Review Comment:
   Why add this?



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>

Review Comment:
   Why add this?



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>

Review Comment:
   Use properties



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>

Review Comment:
   Use properties



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1434030131

   > Hi, please fix code style.
   
   done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1506230774

   hi,all,Who has time to help review this pr, it takes a long time, I can add e2e test to #4382 PulsarSink after completion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1473114623

   > Please ensure that your pull request's CI/CD process passes. Thank you.
   I have merged the latest dev branch, and the following link is always reported as abnormal, but the access to this link is normal. Do you know what the problem is?
   <img width="884" alt="image" src="https://user-images.githubusercontent.com/40714172/225812998-11036a58-10a3-4e5d-b1aa-a48eb8493fd3.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1536984233

   > @Hisoka-X @TyrantLucifer @hailin0 please approve ci. thanks.
   
   Sorry for late response, done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1494030242

   Hello everyone, who has time to help review this pr. thanks. @EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1156726989


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>

Review Comment:
   deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1161379798


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <pulsar.version>2.8.0</pulsar.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-pulsar</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-all</artifactId>
+            <version>${pulsar.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.pulsar</groupId>
+                    <artifactId>pulsar-package-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-console</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>3.23.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>pulsar</artifactId>
+            <version>1.17.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+            <version>${slf4j.version}</version>

Review Comment:
   @hailin0 PTAL. thanks.
   This pr time is quite long, there are other pr waiting for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1402031881

   Please add schema / format config into e2e test case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1447421845

   > Please fix the ci error.
   I don't know why these two ci errors occur occasionally. My local compilation is all right.
   <img width="733" alt="image" src="https://user-images.githubusercontent.com/40714172/221727992-ffa0c16c-4960-46ee-acfb-c7a394e79275.png">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#issuecomment-1432898729

   > Please add schema / format config into e2e test case.
   
   @Hisoka-X  add complete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "ashulin (via GitHub)" <gi...@apache.org>.
ashulin commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1149225283


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>

Review Comment:
   ```suggestion
   ```
   The root pom.xml will manage these properties uniformly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on a diff in pull request #3990: [Improve][Connector][PulsarSource]Improve pulsar deserialization

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #3990:
URL: https://github.com/apache/incubator-seatunnel/pull/3990#discussion_r1149924704


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/pom.xml:
##########
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-pulsar-e2e</artifactId>
+
+    <properties>
+        <maven.compiler.source>11</maven.compiler.source>
+        <maven.compiler.target>11</maven.compiler.target>

Review Comment:
   ok, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org