You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "lightzhao (via GitHub)" <gi...@apache.org> on 2023/03/21 10:52:13 UTC

[GitHub] [incubator-seatunnel] lightzhao opened a new pull request, #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

lightzhao opened a new pull request, #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   Add Pulsar Sink Connector.
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   * [ ] If you are contributing the connector code, please check that the following files are updated:
     1. Update change log that in connector document. For more details you can refer to [connector-v2](https://github.com/apache/incubator-seatunnel/tree/dev/docs/en/connector-v2)
     2. Update [plugin-mapping.properties](https://github.com/apache/incubator-seatunnel/blob/dev/plugin-mapping.properties) and add new connector information in it
     3. Update the pom file of [seatunnel-dist](https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-dist/pom.xml)
   * [ ] Update the [`release-note`](https://github.com/apache/incubator-seatunnel/blob/dev/release-note.md).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Carl-Zhou-CN (via GitHub)" <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1430810518


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarSinkState.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.state;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class PulsarSinkState implements Serializable {
+
+    /** The transaction id. */
+    private final TxnID txnID;

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1647686975

   It's been a long time, @TyrantLucifer @hailin0 @EricJoy2048 @Hisoka-X PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1436208092


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/fake_to_pulsar.conf:
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1

Review Comment:
   ```suggestion
    parallelism = 1
   ```
   
   https://github.com/apache/seatunnel/pull/6003



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1610449476

   @all Can anyone help to review this pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Carl-Zhou-CN (via GitHub)" <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1430811118


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.seatunnel.common.PropertiesUtil.setOption;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
+
+/**
+ * Pulsar Sink implementation by using SeaTunnel sink API. This class contains the method to create
+ * {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class PulsarSink

Review Comment:
   @Hisoka-X cc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1715042284

   > > <img alt="image" width="952" src="https://user-images.githubusercontent.com/40714172/265675906-e33dc14f-a9de-4aa6-a73a-b63704bae482.png"> Why is there such an exception? It has nothing to do with the module I submitted, and the latest code has also been merged.
   > 
   > Hi, @lightzhao Please rebase dev branch and push again. @ic4y Can you take a look about the CI error?
   
   done. please ci.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1401426109


##########
docs/en/connector-v2/sink/pulsar.md:
##########
@@ -0,0 +1,139 @@
+# Apache Pulsar
+
+> Apache Pulsar sink connector
+
+## Description
+
+Sink connector for Apache Pulsar.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+## Options
+
+|         name         |  type  | required |    default value    |
+|----------------------|--------|----------|---------------------|

Review Comment:
   update doc format 
   
   https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Kafka.md



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.pulsar;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeDataGenerator;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+@Slf4j
+public class PulsarSinkIT extends TestSuiteBase implements TestResource {
+
+    private static final String PULSAR_IMAGE_NAME = "apachepulsar/pulsar:2.3.1";
+    public static final String PULSAR_HOST = "pulsar.e2e.sink";
+    public static final String TOPIC = "topic-test01";
+    private PulsarContainer pulsarContainer;
+    private PulsarClient client;
+    private Producer<byte[]> producer;
+
+    private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                    },
+                    new SeaTunnelDataType[] {
+                        new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+                        ArrayType.INT_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(38, 10),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                    });
+
+    @Override
+    @BeforeAll
+    public void startUp() throws Exception {
+        pulsarContainer =
+                new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(PULSAR_HOST)
+                        .withStartupTimeout(Duration.of(400, SECONDS))
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
+
+        Startables.deepStart(Stream.of(pulsarContainer)).join();
+        Awaitility.given()
+                .ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initTopic);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        pulsarContainer.close();
+        client.close();
+        producer.close();
+    }
+
+    private void initTopic() throws PulsarClientException {
+        client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
+        producer = client.newProducer(Schema.BYTES).topic(TOPIC).create();
+        produceData();
+    }
+
+    private void produceData() {
+
+        try {
+            FakeConfig fakeConfig = FakeConfig.buildWithConfig(ConfigFactory.empty());
+            FakeDataGenerator fakeDataGenerator =
+                    new FakeDataGenerator(SEATUNNEL_ROW_TYPE, fakeConfig);
+            SimpleCollector simpleCollector = new SimpleCollector();
+            fakeDataGenerator.collectFakedRows(100, simpleCollector);
+            JsonSerializationSchema jsonSerializationSchema =
+                    new JsonSerializationSchema(SEATUNNEL_ROW_TYPE);
+            for (SeaTunnelRow seaTunnelRow : simpleCollector.getList()) {
+                producer.send(jsonSerializationSchema.serialize(seaTunnelRow));
+            }
+        } catch (PulsarClientException e) {
+            throw new RuntimeException("produce data error", e);
+        }
+    }
+
+    private static class SimpleCollector implements Collector<SeaTunnelRow> {
+        @Getter private List<SeaTunnelRow> list = new ArrayList<>();
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            list.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+
+    @TestTemplate
+    void testPulsarSink(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/pulsar_to_pulsar.conf");
+        Assertions.assertEquals(execResult.getExitCode(), 0);

Review Comment:
   consume data and check count & format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1774326270

   @hailin0 @EricJoy2048 @Hisoka-X @ic4y PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1564187998

   please approve ci.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "TyrantLucifer (via GitHub)" <gi...@apache.org>.
TyrantLucifer commented on PR #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382#issuecomment-1494013962

   > Hello everyone, why can't the automatic ci check be performed after updating the code now, and it needs to be approved?
   
   Yes, infra changed the rule of this. It needs to be approved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1867175556

   @hailin0 @Carl-Zhou-CN @Hisoka-X PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1424756757


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.seatunnel.common.PropertiesUtil.setOption;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
+
+/**
+ * Pulsar Sink implementation by using SeaTunnel sink API. This class contains the method to create
+ * {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class PulsarSink

Review Comment:
   Do kafka and pulsar need the support of multiple table sinks? Multiple topics are supported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382#issuecomment-1495300982

   > > Hello everyone, why can't the automatic ci check be performed after updating the code now, and it needs to be approved?
   > 
   > Yes, infra changed the rule of this. It needs to be approved.
   
   please review #3990,and then add pulsar sink e2e test. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 merged PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1407080100


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarSinkIT.java:
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.pulsar;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
+import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeDataGenerator;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+@Slf4j
+public class PulsarSinkIT extends TestSuiteBase implements TestResource {
+
+    private static final String PULSAR_IMAGE_NAME = "apachepulsar/pulsar:2.3.1";
+    public static final String PULSAR_HOST = "pulsar.e2e.sink";
+    public static final String TOPIC = "topic-test01";
+    private PulsarContainer pulsarContainer;
+    private PulsarClient client;
+    private Producer<byte[]> producer;
+
+    private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                    },
+                    new SeaTunnelDataType[] {
+                        new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
+                        ArrayType.INT_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(38, 10),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                    });
+
+    @Override
+    @BeforeAll
+    public void startUp() throws Exception {
+        pulsarContainer =
+                new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(PULSAR_HOST)
+                        .withStartupTimeout(Duration.of(400, SECONDS))
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));
+
+        Startables.deepStart(Stream.of(pulsarContainer)).join();
+        Awaitility.given()
+                .ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::initTopic);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        pulsarContainer.close();
+        client.close();
+        producer.close();
+    }
+
+    private void initTopic() throws PulsarClientException {
+        client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
+        producer = client.newProducer(Schema.BYTES).topic(TOPIC).create();
+        produceData();
+    }
+
+    private void produceData() {
+
+        try {
+            FakeConfig fakeConfig = FakeConfig.buildWithConfig(ConfigFactory.empty());
+            FakeDataGenerator fakeDataGenerator =
+                    new FakeDataGenerator(SEATUNNEL_ROW_TYPE, fakeConfig);
+            SimpleCollector simpleCollector = new SimpleCollector();
+            fakeDataGenerator.collectFakedRows(100, simpleCollector);
+            JsonSerializationSchema jsonSerializationSchema =
+                    new JsonSerializationSchema(SEATUNNEL_ROW_TYPE);
+            for (SeaTunnelRow seaTunnelRow : simpleCollector.getList()) {
+                producer.send(jsonSerializationSchema.serialize(seaTunnelRow));
+            }
+        } catch (PulsarClientException e) {
+            throw new RuntimeException("produce data error", e);
+        }
+    }
+
+    private static class SimpleCollector implements Collector<SeaTunnelRow> {
+        @Getter private List<SeaTunnelRow> list = new ArrayList<>();
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            list.add(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+
+    @TestTemplate
+    void testPulsarSink(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/pulsar_to_pulsar.conf");
+        Assertions.assertEquals(execResult.getExitCode(), 0);

Review Comment:
   done



##########
docs/en/connector-v2/sink/pulsar.md:
##########
@@ -0,0 +1,139 @@
+# Apache Pulsar
+
+> Apache Pulsar sink connector
+
+## Description
+
+Sink connector for Apache Pulsar.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+## Options
+
+|         name         |  type  | required |    default value    |
+|----------------------|--------|----------|---------------------|

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Carl-Zhou-CN (via GitHub)" <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1411796124


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+
+import com.google.auto.service.AutoService;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+
+@AutoService(Factory.class)
+public class PulsarSinkFactory implements TableSinkFactory {

Review Comment:
   Support TableSourceFactory/TableSinkFactory on connector,please refer to the following link for the upgrade instructions: https://github.com/apache/seatunnel/issues/5651.



##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarSinkState.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.state;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class PulsarSinkState implements Serializable {
+
+    /** The transaction id. */
+    private final TxnID txnID;

Review Comment:
   Wouldn't it be better to store the 'topic' as part of the state?



##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+
+import com.google.auto.service.AutoService;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+
+@AutoService(Factory.class)
+public class PulsarSinkFactory implements TableSinkFactory {

Review Comment:
   Support TableSourceFactory/TableSinkFactory on connector,please refer to the following link for the upgrade instructions: https://github.com/apache/seatunnel/issues/5651.



##########
docs/en/connector-v2/sink/pulsar.md:
##########
@@ -0,0 +1,174 @@
+# Apache Pulsar

Review Comment:
   docs/en/connector-v2/sink/puslar.md -> Please refer to the "kafka.md" file for some upgrade instructions,
   https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Kafka.md



##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.seatunnel.common.PropertiesUtil.setOption;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
+
+/**
+ * Pulsar Sink implementation by using SeaTunnel sink API. This class contains the method to create
+ * {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class PulsarSink

Review Comment:
   Support multi-table sink feature: please refer to the following link for the upgrade instructions: https://github.com/apache/seatunnel/issues/5652



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Carl-Zhou-CN (via GitHub)" <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1430809381


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
+
+public class PulsarSinkWriter
+        implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
+
+    private Context context;
+    private Producer<byte[]> producer;
+    private PulsarClient pulsarClient;
+    private SerializationSchema serializationSchema;
+    private SerializationSchema keySerializationSchema;
+    private TransactionImpl transaction;
+    private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
+    private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+    private final AtomicLong pendingMessages;
+
+    public PulsarSinkWriter(
+            Context context,
+            PulsarClientConfig clientConfig,
+            SeaTunnelRowType seaTunnelRowType,
+            ReadonlyConfig pluginConfig,
+            List<PulsarSinkState> pulsarStates) {
+        this.context = context;
+        String topic = pluginConfig.get(TOPIC);
+        String format = pluginConfig.get(FORMAT);
+        String delimiter = pluginConfig.get(FIELD_DELIMITER);
+        Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
+        PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
+        MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE);
+        this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter);
+        List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType);
+        this.keySerializationSchema =
+                createKeySerializationSchema(partitionKeyList, seaTunnelRowType);
+        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics);
+
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+        }
+        try {
+            this.producer =
+                    PulsarConfigUtil.createProducer(
+                            pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode);
+        } catch (PulsarClientException e) {
+            throw new PulsarConnectorException(
+                    PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED,
+                    "Pulsar Producer create fail.");
+        }
+        this.pendingMessages = new AtomicLong(0);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        byte[] message = serializationSchema.serialize(element);
+        byte[] key = null;
+        if (keySerializationSchema != null) {
+            key = keySerializationSchema.serialize(element);
+        }
+        TypedMessageBuilder<byte[]> typedMessageBuilder =
+                PulsarConfigUtil.createTypedMessageBuilder(producer, transaction);
+        if (key != null) {
+            typedMessageBuilder.keyBytes(key);
+        }
+        typedMessageBuilder.value(message);
+        if (PulsarSemantics.NON == pulsarSemantics) {
+            typedMessageBuilder.sendAsync();
+        } else {
+            pendingMessages.incrementAndGet();
+            CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
+            future.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.decrementAndGet();
+                        if (ex != null) {
+                            throw new PulsarConnectorException(
+                                    PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
+                                    "send message failed");
+                        }
+                    });
+        }
+    }
+
+    @Override
+    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID());
+            return Optional.of(pulsarCommitInfo);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
+        if (PulsarSemantics.NON != pulsarSemantics) {
+            /** flush pending messages */
+            producer.flush();
+            while (pendingMessages.longValue() > 0) {
+                producer.flush();
+            }
+        }
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            List<PulsarSinkState> pulsarSinkStates =
+                    Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID()));
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+            return pulsarSinkStates;
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void abortPrepare() {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            transaction.abort();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        producer.close();
+        pulsarClient.close();
+    }
+
+    /**
+     * get message SerializationSchema
+     *
+     * @param rowType
+     * @param format
+     * @param delimiter
+     * @return
+     */
+    private SerializationSchema createSerializationSchema(
+            SeaTunnelRowType rowType, String format, String delimiter) {
+        if (DEFAULT_FORMAT.equals(format)) {
+            return new JsonSerializationSchema(rowType);
+        } else if (TEXT_FORMAT.equals(format)) {
+            return TextSerializationSchema.builder()
+                    .seaTunnelRowType(rowType)
+                    .delimiter(delimiter)
+                    .build();
+        } else {
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+        }
+    }
+
+    /**
+     * get key SerializationSchema
+     *
+     * @param keyFieldNames
+     * @param seaTunnelRowType
+     * @return
+     */
+    public static SerializationSchema createKeySerializationSchema(
+            List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
+        if (keyFieldNames == null || keyFieldNames.isEmpty()) {
+            return null;
+        }
+        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
+        for (int i = 0; i < keyFieldNames.size(); i++) {
+            String keyFieldName = keyFieldNames.get(i);
+            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+            keyFieldIndexArr[i] = rowFieldIndex;
+            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
+        }
+        SeaTunnelRowType keyType =
+                new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+        SerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType);
+
+        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor =
+                row -> {
+                    Object[] keyFields = new Object[keyFieldIndexArr.length];
+                    for (int i = 0; i < keyFieldIndexArr.length; i++) {
+                        keyFields[i] = row.getField(keyFieldIndexArr[i]);
+                    }
+                    return new SeaTunnelRow(keyFields);
+                };
+        return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
+    }
+
+    /**
+     * get partition key field list
+     *
+     * @param pluginConfig

Review Comment:
   The comment of these parameters is useless, can we delete it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "hailin0 (via GitHub)" <gi...@apache.org>.
hailin0 commented on code in PR #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382#discussion_r1148962126


##########
docs/en/connector-v2/sink/pulsar.md:
##########
@@ -0,0 +1,144 @@
+# Apache Pulsar
+
+> Apache Pulsar sink connector
+
+## Description
+
+Sink connector for Apache Pulsar.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)

Review Comment:
   ```suggestion
   ```



##########
docs/en/connector-v2/sink/pulsar.md:
##########
@@ -0,0 +1,144 @@
+# Apache Pulsar
+
+> Apache Pulsar sink connector
+
+## Description
+
+Sink connector for Apache Pulsar.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)

Review Comment:
   ```suggestion
   ```
   
   
   https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/concept/connector-v2-features.md#sink-connector-features



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1432405867


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
+
+public class PulsarSinkWriter
+        implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
+
+    private Context context;
+    private Producer<byte[]> producer;
+    private PulsarClient pulsarClient;
+    private SerializationSchema serializationSchema;
+    private SerializationSchema keySerializationSchema;
+    private TransactionImpl transaction;
+    private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
+    private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+    private final AtomicLong pendingMessages;
+
+    public PulsarSinkWriter(
+            Context context,
+            PulsarClientConfig clientConfig,
+            SeaTunnelRowType seaTunnelRowType,
+            ReadonlyConfig pluginConfig,
+            List<PulsarSinkState> pulsarStates) {
+        this.context = context;
+        String topic = pluginConfig.get(TOPIC);
+        String format = pluginConfig.get(FORMAT);
+        String delimiter = pluginConfig.get(FIELD_DELIMITER);
+        Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
+        PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
+        MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE);
+        this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter);
+        List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType);
+        this.keySerializationSchema =
+                createKeySerializationSchema(partitionKeyList, seaTunnelRowType);
+        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics);
+
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+        }
+        try {
+            this.producer =
+                    PulsarConfigUtil.createProducer(
+                            pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode);
+        } catch (PulsarClientException e) {
+            throw new PulsarConnectorException(
+                    PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED,
+                    "Pulsar Producer create fail.");
+        }
+        this.pendingMessages = new AtomicLong(0);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        byte[] message = serializationSchema.serialize(element);
+        byte[] key = null;
+        if (keySerializationSchema != null) {
+            key = keySerializationSchema.serialize(element);
+        }
+        TypedMessageBuilder<byte[]> typedMessageBuilder =
+                PulsarConfigUtil.createTypedMessageBuilder(producer, transaction);
+        if (key != null) {
+            typedMessageBuilder.keyBytes(key);
+        }
+        typedMessageBuilder.value(message);
+        if (PulsarSemantics.NON == pulsarSemantics) {
+            typedMessageBuilder.sendAsync();
+        } else {
+            pendingMessages.incrementAndGet();
+            CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
+            future.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.decrementAndGet();
+                        if (ex != null) {
+                            throw new PulsarConnectorException(
+                                    PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
+                                    "send message failed");
+                        }
+                    });
+        }
+    }
+
+    @Override
+    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID());
+            return Optional.of(pulsarCommitInfo);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
+        if (PulsarSemantics.NON != pulsarSemantics) {
+            /** flush pending messages */
+            producer.flush();
+            while (pendingMessages.longValue() > 0) {
+                producer.flush();
+            }
+        }
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            List<PulsarSinkState> pulsarSinkStates =
+                    Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID()));
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+            return pulsarSinkStates;
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void abortPrepare() {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            transaction.abort();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        producer.close();
+        pulsarClient.close();
+    }
+
+    /**
+     * get message SerializationSchema
+     *
+     * @param rowType
+     * @param format
+     * @param delimiter
+     * @return
+     */
+    private SerializationSchema createSerializationSchema(
+            SeaTunnelRowType rowType, String format, String delimiter) {
+        if (DEFAULT_FORMAT.equals(format)) {
+            return new JsonSerializationSchema(rowType);
+        } else if (TEXT_FORMAT.equals(format)) {
+            return TextSerializationSchema.builder()
+                    .seaTunnelRowType(rowType)
+                    .delimiter(delimiter)
+                    .build();
+        } else {
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+        }
+    }
+
+    /**
+     * get key SerializationSchema
+     *
+     * @param keyFieldNames
+     * @param seaTunnelRowType
+     * @return
+     */
+    public static SerializationSchema createKeySerializationSchema(
+            List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
+        if (keyFieldNames == null || keyFieldNames.isEmpty()) {
+            return null;
+        }
+        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
+        for (int i = 0; i < keyFieldNames.size(); i++) {
+            String keyFieldName = keyFieldNames.get(i);
+            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+            keyFieldIndexArr[i] = rowFieldIndex;
+            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
+        }
+        SeaTunnelRowType keyType =
+                new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+        SerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType);
+
+        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor =
+                row -> {
+                    Object[] keyFields = new Object[keyFieldIndexArr.length];
+                    for (int i = 0; i < keyFieldIndexArr.length; i++) {
+                        keyFields[i] = row.getField(keyFieldIndexArr[i]);
+                    }
+                    return new SeaTunnelRow(keyFields);
+                };
+        return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
+    }
+
+    /**
+     * get partition key field list
+     *
+     * @param pluginConfig

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1829271894

   @hailin0 @EricJoy2048 @Hisoka-X @ic4y PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1424749227


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/state/PulsarSinkState.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.state;
+
+import org.apache.pulsar.client.api.transaction.TxnID;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+public class PulsarSinkState implements Serializable {
+
+    /** The transaction id. */
+    private final TxnID txnID;

Review Comment:
   Only TxnID is required when commit, and the topic parameter is not required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1886084634

   @hailin0 @Hisoka-X @Carl-Zhou-CN PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382#issuecomment-1484742048

   > please update `release-note.md` please add e2e testcase
   > 
   > reference https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e
   
   waiting #3990 ,In this pr I added the pulsar-e2e module. doc has been updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1606402149

   @TyrantLucifer @hailin0 @EricJoy2048 @Hisoka-X PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1594301789

   @Hisoka-X @hailin0 @EricJoy2048 please approve ci.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] ic4y commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1666496963

   Waiting for CI/CD.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Hisoka-X (via GitHub)" <gi...@apache.org>.
Hisoka-X commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1432441541


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSink.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.seatunnel.common.PropertiesUtil.setOption;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;
+
+/**
+ * Pulsar Sink implementation by using SeaTunnel sink API. This class contains the method to create
+ * {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}.
+ */
+@AutoService(SeaTunnelSink.class)
+public class PulsarSink

Review Comment:
   Supporting multiple tables is mainly to accept multiple upstream tables, which can be implemented separately in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Carl-Zhou-CN (via GitHub)" <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1430809381


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
+
+public class PulsarSinkWriter
+        implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
+
+    private Context context;
+    private Producer<byte[]> producer;
+    private PulsarClient pulsarClient;
+    private SerializationSchema serializationSchema;
+    private SerializationSchema keySerializationSchema;
+    private TransactionImpl transaction;
+    private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
+    private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+    private final AtomicLong pendingMessages;
+
+    public PulsarSinkWriter(
+            Context context,
+            PulsarClientConfig clientConfig,
+            SeaTunnelRowType seaTunnelRowType,
+            ReadonlyConfig pluginConfig,
+            List<PulsarSinkState> pulsarStates) {
+        this.context = context;
+        String topic = pluginConfig.get(TOPIC);
+        String format = pluginConfig.get(FORMAT);
+        String delimiter = pluginConfig.get(FIELD_DELIMITER);
+        Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
+        PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
+        MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE);
+        this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter);
+        List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType);
+        this.keySerializationSchema =
+                createKeySerializationSchema(partitionKeyList, seaTunnelRowType);
+        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics);
+
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+        }
+        try {
+            this.producer =
+                    PulsarConfigUtil.createProducer(
+                            pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode);
+        } catch (PulsarClientException e) {
+            throw new PulsarConnectorException(
+                    PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED,
+                    "Pulsar Producer create fail.");
+        }
+        this.pendingMessages = new AtomicLong(0);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        byte[] message = serializationSchema.serialize(element);
+        byte[] key = null;
+        if (keySerializationSchema != null) {
+            key = keySerializationSchema.serialize(element);
+        }
+        TypedMessageBuilder<byte[]> typedMessageBuilder =
+                PulsarConfigUtil.createTypedMessageBuilder(producer, transaction);
+        if (key != null) {
+            typedMessageBuilder.keyBytes(key);
+        }
+        typedMessageBuilder.value(message);
+        if (PulsarSemantics.NON == pulsarSemantics) {
+            typedMessageBuilder.sendAsync();
+        } else {
+            pendingMessages.incrementAndGet();
+            CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
+            future.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.decrementAndGet();
+                        if (ex != null) {
+                            throw new PulsarConnectorException(
+                                    PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
+                                    "send message failed");
+                        }
+                    });
+        }
+    }
+
+    @Override
+    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID());
+            return Optional.of(pulsarCommitInfo);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
+        if (PulsarSemantics.NON != pulsarSemantics) {
+            /** flush pending messages */
+            producer.flush();
+            while (pendingMessages.longValue() > 0) {
+                producer.flush();
+            }
+        }
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            List<PulsarSinkState> pulsarSinkStates =
+                    Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID()));
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+            return pulsarSinkStates;
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void abortPrepare() {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            transaction.abort();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        producer.close();
+        pulsarClient.close();
+    }
+
+    /**
+     * get message SerializationSchema
+     *
+     * @param rowType
+     * @param format
+     * @param delimiter
+     * @return
+     */
+    private SerializationSchema createSerializationSchema(
+            SeaTunnelRowType rowType, String format, String delimiter) {
+        if (DEFAULT_FORMAT.equals(format)) {
+            return new JsonSerializationSchema(rowType);
+        } else if (TEXT_FORMAT.equals(format)) {
+            return TextSerializationSchema.builder()
+                    .seaTunnelRowType(rowType)
+                    .delimiter(delimiter)
+                    .build();
+        } else {
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+        }
+    }
+
+    /**
+     * get key SerializationSchema
+     *
+     * @param keyFieldNames
+     * @param seaTunnelRowType
+     * @return
+     */
+    public static SerializationSchema createKeySerializationSchema(
+            List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
+        if (keyFieldNames == null || keyFieldNames.isEmpty()) {
+            return null;
+        }
+        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
+        for (int i = 0; i < keyFieldNames.size(); i++) {
+            String keyFieldName = keyFieldNames.get(i);
+            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+            keyFieldIndexArr[i] = rowFieldIndex;
+            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
+        }
+        SeaTunnelRowType keyType =
+                new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+        SerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType);
+
+        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor =
+                row -> {
+                    Object[] keyFields = new Object[keyFieldIndexArr.length];
+                    for (int i = 0; i < keyFieldIndexArr.length; i++) {
+                        keyFields[i] = row.getField(keyFieldIndexArr[i]);
+                    }
+                    return new SeaTunnelRow(keyFields);
+                };
+        return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
+    }
+
+    /**
+     * get partition key field list
+     *
+     * @param pluginConfig

Review Comment:
   The comment of these parameters is useless, can we delete it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1794236442

   @hailin0 @EricJoy2048 @Hisoka-X @ic4y PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382#issuecomment-1484752545

   Hello everyone, why can't the automatic ci check be performed after updating the code now, and it needs to be approved?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1680378559

   @ic4y please approve ci. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] ic4y closed pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "ic4y (via GitHub)" <gi...@apache.org>.
ic4y closed pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.
URL: https://github.com/apache/seatunnel/pull/4382


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1424737649


##########
docs/en/connector-v2/sink/pulsar.md:
##########
@@ -0,0 +1,174 @@
+# Apache Pulsar

Review Comment:
   done.



##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+
+import com.google.auto.service.AutoService;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
+
+@AutoService(Factory.class)
+public class PulsarSinkFactory implements TableSinkFactory {

Review Comment:
   done.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1567981521

   @Hisoka-X @hailin0 @TyrantLucifer please approve ci.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Carl-Zhou-CN (via GitHub)" <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1430809811


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
+
+public class PulsarSinkWriter
+        implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
+
+    private Context context;
+    private Producer<byte[]> producer;
+    private PulsarClient pulsarClient;
+    private SerializationSchema serializationSchema;
+    private SerializationSchema keySerializationSchema;
+    private TransactionImpl transaction;
+    private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
+    private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+    private final AtomicLong pendingMessages;
+
+    public PulsarSinkWriter(
+            Context context,
+            PulsarClientConfig clientConfig,
+            SeaTunnelRowType seaTunnelRowType,
+            ReadonlyConfig pluginConfig,
+            List<PulsarSinkState> pulsarStates) {
+        this.context = context;
+        String topic = pluginConfig.get(TOPIC);
+        String format = pluginConfig.get(FORMAT);
+        String delimiter = pluginConfig.get(FIELD_DELIMITER);
+        Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
+        PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
+        MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE);
+        this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter);
+        List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType);
+        this.keySerializationSchema =
+                createKeySerializationSchema(partitionKeyList, seaTunnelRowType);
+        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics);
+
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+        }
+        try {
+            this.producer =
+                    PulsarConfigUtil.createProducer(
+                            pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode);
+        } catch (PulsarClientException e) {
+            throw new PulsarConnectorException(
+                    PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED,
+                    "Pulsar Producer create fail.");
+        }
+        this.pendingMessages = new AtomicLong(0);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        byte[] message = serializationSchema.serialize(element);
+        byte[] key = null;
+        if (keySerializationSchema != null) {
+            key = keySerializationSchema.serialize(element);
+        }
+        TypedMessageBuilder<byte[]> typedMessageBuilder =
+                PulsarConfigUtil.createTypedMessageBuilder(producer, transaction);
+        if (key != null) {
+            typedMessageBuilder.keyBytes(key);
+        }
+        typedMessageBuilder.value(message);
+        if (PulsarSemantics.NON == pulsarSemantics) {
+            typedMessageBuilder.sendAsync();
+        } else {
+            pendingMessages.incrementAndGet();
+            CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
+            future.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.decrementAndGet();
+                        if (ex != null) {
+                            throw new PulsarConnectorException(
+                                    PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
+                                    "send message failed");
+                        }
+                    });
+        }
+    }
+
+    @Override
+    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID());
+            return Optional.of(pulsarCommitInfo);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
+        if (PulsarSemantics.NON != pulsarSemantics) {
+            /** flush pending messages */
+            producer.flush();
+            while (pendingMessages.longValue() > 0) {
+                producer.flush();
+            }
+        }
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            List<PulsarSinkState> pulsarSinkStates =
+                    Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID()));
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+            return pulsarSinkStates;
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void abortPrepare() {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            transaction.abort();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        producer.close();
+        pulsarClient.close();
+    }
+
+    /**
+     * get message SerializationSchema
+     *
+     * @param rowType

Review Comment:
   The comment of these parameters is useless, can we delete it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1569876776

   Why can't I suddenly receive the seattunnel notification email? Can everyone receive the message I sent?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] EricJoy2048 commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "EricJoy2048 (via GitHub)" <gi...@apache.org>.
EricJoy2048 commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1713243439

   > <img alt="image" width="952" src="https://user-images.githubusercontent.com/40714172/265675906-e33dc14f-a9de-4aa6-a73a-b63704bae482.png"> Why is there such an exception? It has nothing to do with the module I submitted, and the latest code has also been merged.
   
   Hi, @lightzhao  Please rebase dev branch and push again. @ic4y  Can you take a look about the CI error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382#issuecomment-1482448585

   Hello everyone, ci check cannot pautomatically, ci check require approval?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/incubator-seatunnel/pull/4382#issuecomment-1484399309

   Hello everyone, it has been running stably in the test environment for two weeks, who has time to help review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1694877438

   @ALL  please approve ci. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1706399550

   <img width="952" alt="image" src="https://github.com/apache/seatunnel/assets/40714172/e33dc14f-a9de-4aa6-a73a-b63704bae482">
   Why is there such an exception? It has nothing to do with the module I submitted, and the latest code has also been merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [seatunnel] lightzhao commented on pull request #4382: [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector.

Posted by "lightzhao (via GitHub)" <gi...@apache.org>.
lightzhao commented on PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#issuecomment-1641853037

   @TyrantLucifer @hailin0 @EricJoy2048 @Hisoka-X PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. [seatunnel]

Posted by "Carl-Zhou-CN (via GitHub)" <gi...@apache.org>.
Carl-Zhou-CN commented on code in PR #4382:
URL: https://github.com/apache/seatunnel/pull/4382#discussion_r1430810074


##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
+
+public class PulsarSinkWriter
+        implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
+
+    private Context context;
+    private Producer<byte[]> producer;
+    private PulsarClient pulsarClient;
+    private SerializationSchema serializationSchema;
+    private SerializationSchema keySerializationSchema;
+    private TransactionImpl transaction;
+    private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
+    private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+    private final AtomicLong pendingMessages;
+
+    public PulsarSinkWriter(
+            Context context,
+            PulsarClientConfig clientConfig,
+            SeaTunnelRowType seaTunnelRowType,
+            ReadonlyConfig pluginConfig,
+            List<PulsarSinkState> pulsarStates) {
+        this.context = context;
+        String topic = pluginConfig.get(TOPIC);
+        String format = pluginConfig.get(FORMAT);
+        String delimiter = pluginConfig.get(FIELD_DELIMITER);
+        Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
+        PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
+        MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE);
+        this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter);
+        List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType);
+        this.keySerializationSchema =
+                createKeySerializationSchema(partitionKeyList, seaTunnelRowType);
+        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics);
+
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+        }
+        try {
+            this.producer =
+                    PulsarConfigUtil.createProducer(
+                            pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode);
+        } catch (PulsarClientException e) {
+            throw new PulsarConnectorException(
+                    PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED,
+                    "Pulsar Producer create fail.");
+        }
+        this.pendingMessages = new AtomicLong(0);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        byte[] message = serializationSchema.serialize(element);
+        byte[] key = null;
+        if (keySerializationSchema != null) {
+            key = keySerializationSchema.serialize(element);
+        }
+        TypedMessageBuilder<byte[]> typedMessageBuilder =
+                PulsarConfigUtil.createTypedMessageBuilder(producer, transaction);
+        if (key != null) {
+            typedMessageBuilder.keyBytes(key);
+        }
+        typedMessageBuilder.value(message);
+        if (PulsarSemantics.NON == pulsarSemantics) {
+            typedMessageBuilder.sendAsync();
+        } else {
+            pendingMessages.incrementAndGet();
+            CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
+            future.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.decrementAndGet();
+                        if (ex != null) {
+                            throw new PulsarConnectorException(
+                                    PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
+                                    "send message failed");
+                        }
+                    });
+        }
+    }
+
+    @Override
+    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID());
+            return Optional.of(pulsarCommitInfo);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
+        if (PulsarSemantics.NON != pulsarSemantics) {
+            /** flush pending messages */
+            producer.flush();
+            while (pendingMessages.longValue() > 0) {
+                producer.flush();
+            }
+        }
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            List<PulsarSinkState> pulsarSinkStates =
+                    Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID()));
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+            return pulsarSinkStates;
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void abortPrepare() {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            transaction.abort();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        producer.close();
+        pulsarClient.close();
+    }
+
+    /**
+     * get message SerializationSchema
+     *
+     * @param rowType
+     * @param format
+     * @param delimiter
+     * @return
+     */
+    private SerializationSchema createSerializationSchema(
+            SeaTunnelRowType rowType, String format, String delimiter) {
+        if (DEFAULT_FORMAT.equals(format)) {
+            return new JsonSerializationSchema(rowType);
+        } else if (TEXT_FORMAT.equals(format)) {
+            return TextSerializationSchema.builder()
+                    .seaTunnelRowType(rowType)
+                    .delimiter(delimiter)
+                    .build();
+        } else {
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+        }
+    }
+
+    /**
+     * get key SerializationSchema
+     *
+     * @param keyFieldNames

Review Comment:
   Same as above



##########
seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.DEFAULT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.MESSAGE_ROUTING_MODE;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
+import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;
+
+public class PulsarSinkWriter
+        implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
+
+    private Context context;
+    private Producer<byte[]> producer;
+    private PulsarClient pulsarClient;
+    private SerializationSchema serializationSchema;
+    private SerializationSchema keySerializationSchema;
+    private TransactionImpl transaction;
+    private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
+    private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
+    private final AtomicLong pendingMessages;
+
+    public PulsarSinkWriter(
+            Context context,
+            PulsarClientConfig clientConfig,
+            SeaTunnelRowType seaTunnelRowType,
+            ReadonlyConfig pluginConfig,
+            List<PulsarSinkState> pulsarStates) {
+        this.context = context;
+        String topic = pluginConfig.get(TOPIC);
+        String format = pluginConfig.get(FORMAT);
+        String delimiter = pluginConfig.get(FIELD_DELIMITER);
+        Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
+        PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
+        MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE);
+        this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter);
+        List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType);
+        this.keySerializationSchema =
+                createKeySerializationSchema(partitionKeyList, seaTunnelRowType);
+        this.pulsarClient = PulsarConfigUtil.createClient(clientConfig, pulsarSemantics);
+
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+        }
+        try {
+            this.producer =
+                    PulsarConfigUtil.createProducer(
+                            pulsarClient, topic, pulsarSemantics, pluginConfig, messageRoutingMode);
+        } catch (PulsarClientException e) {
+            throw new PulsarConnectorException(
+                    PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED,
+                    "Pulsar Producer create fail.");
+        }
+        this.pendingMessages = new AtomicLong(0);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        byte[] message = serializationSchema.serialize(element);
+        byte[] key = null;
+        if (keySerializationSchema != null) {
+            key = keySerializationSchema.serialize(element);
+        }
+        TypedMessageBuilder<byte[]> typedMessageBuilder =
+                PulsarConfigUtil.createTypedMessageBuilder(producer, transaction);
+        if (key != null) {
+            typedMessageBuilder.keyBytes(key);
+        }
+        typedMessageBuilder.value(message);
+        if (PulsarSemantics.NON == pulsarSemantics) {
+            typedMessageBuilder.sendAsync();
+        } else {
+            pendingMessages.incrementAndGet();
+            CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
+            future.whenComplete(
+                    (id, ex) -> {
+                        pendingMessages.decrementAndGet();
+                        if (ex != null) {
+                            throw new PulsarConnectorException(
+                                    PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
+                                    "send message failed");
+                        }
+                    });
+        }
+    }
+
+    @Override
+    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            PulsarCommitInfo pulsarCommitInfo = new PulsarCommitInfo(this.transaction.getTxnID());
+            return Optional.of(pulsarCommitInfo);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public List<PulsarSinkState> snapshotState(long checkpointId) throws IOException {
+        if (PulsarSemantics.NON != pulsarSemantics) {
+            /** flush pending messages */
+            producer.flush();
+            while (pendingMessages.longValue() > 0) {
+                producer.flush();
+            }
+        }
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            List<PulsarSinkState> pulsarSinkStates =
+                    Lists.newArrayList(new PulsarSinkState(this.transaction.getTxnID()));
+            try {
+                this.transaction =
+                        (TransactionImpl)
+                                PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
+            } catch (Exception e) {
+                throw new PulsarConnectorException(
+                        PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
+                        "Pulsar transaction create fail.");
+            }
+            return pulsarSinkStates;
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void abortPrepare() {
+        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
+            transaction.abort();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        producer.close();
+        pulsarClient.close();
+    }
+
+    /**
+     * get message SerializationSchema
+     *
+     * @param rowType
+     * @param format
+     * @param delimiter
+     * @return
+     */
+    private SerializationSchema createSerializationSchema(
+            SeaTunnelRowType rowType, String format, String delimiter) {
+        if (DEFAULT_FORMAT.equals(format)) {
+            return new JsonSerializationSchema(rowType);
+        } else if (TEXT_FORMAT.equals(format)) {
+            return TextSerializationSchema.builder()
+                    .seaTunnelRowType(rowType)
+                    .delimiter(delimiter)
+                    .build();
+        } else {
+            throw new SeaTunnelJsonFormatException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+        }
+    }
+
+    /**
+     * get key SerializationSchema
+     *
+     * @param keyFieldNames
+     * @param seaTunnelRowType
+     * @return
+     */
+    public static SerializationSchema createKeySerializationSchema(
+            List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
+        if (keyFieldNames == null || keyFieldNames.isEmpty()) {
+            return null;
+        }
+        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
+        for (int i = 0; i < keyFieldNames.size(); i++) {
+            String keyFieldName = keyFieldNames.get(i);
+            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+            keyFieldIndexArr[i] = rowFieldIndex;
+            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
+        }
+        SeaTunnelRowType keyType =
+                new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+        SerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType);
+
+        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor =
+                row -> {
+                    Object[] keyFields = new Object[keyFieldIndexArr.length];
+                    for (int i = 0; i < keyFieldIndexArr.length; i++) {
+                        keyFields[i] = row.getField(keyFieldIndexArr[i]);
+                    }
+                    return new SeaTunnelRow(keyFields);
+                };
+        return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
+    }
+
+    /**
+     * get partition key field list
+     *
+     * @param pluginConfig

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org