You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/14 03:13:43 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add rocketmq source and sink (#4007)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e33389755 [Feature][Connector-V2] Add rocketmq source and sink (#4007)
e33389755 is described below
commit e333897552e4fd03bda1688c26251d1d674fc318
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Fri Apr 14 11:13:35 2023 +0800
[Feature][Connector-V2] Add rocketmq source and sink (#4007)
---
.../connector-v2/Error-Quick-Reference-Manual.md | 16 +
docs/en/connector-v2/sink/RocketMQ.md | 82 +++++
docs/en/connector-v2/source/RocketMQ.md | 142 ++++++++
plugin-mapping.properties | 3 +-
release-note.md | 1 +
seatunnel-connectors-v2/connector-rocketmq/pom.xml | 61 ++++
.../rocketmq/common/RocketMqAdminUtil.java | 319 ++++++++++++++++++
.../rocketmq/common/RocketMqBaseConfiguration.java | 255 ++++++++++++++
.../seatunnel/rocketmq/common/SchemaFormat.java | 48 +++
.../seatunnel/rocketmq/common/StartMode.java | 27 ++
.../seatunnel/rocketmq/config/Config.java | 68 ++++
.../seatunnel/rocketmq/config/ConsumerConfig.java | 89 +++++
.../seatunnel/rocketmq/config/ProducerConfig.java | 71 ++++
.../exception/RocketMqConnectorErrorCode.java | 71 ++++
.../exception/RocketMqConnectorException.java | 36 ++
.../serialize/DefaultSeaTunnelRowSerializer.java | 127 +++++++
.../rocketmq/serialize/SeaTunnelRowSerializer.java | 33 ++
.../seatunnel/rocketmq/sink/ProducerMetadata.java | 47 +++
.../rocketmq/sink/RocketMqNoTransactionSender.java | 102 ++++++
.../rocketmq/sink/RocketMqProducerSender.java | 26 ++
.../seatunnel/rocketmq/sink/RocketMqSink.java | 170 ++++++++++
.../rocketmq/sink/RocketMqSinkFactory.java | 48 +++
.../rocketmq/sink/RocketMqSinkWriter.java | 102 ++++++
.../rocketmq/sink/RocketMqTransactionSender.java | 80 +++++
.../rocketmq/source/ConsumerMetadata.java | 40 +++
.../rocketmq/source/RocketMqConsumerThread.java | 69 ++++
.../seatunnel/rocketmq/source/RocketMqSource.java | 288 ++++++++++++++++
.../rocketmq/source/RocketMqSourceFactory.java | 66 ++++
.../rocketmq/source/RocketMqSourceReader.java | 259 ++++++++++++++
.../rocketmq/source/RocketMqSourceSplit.java | 79 +++++
.../source/RocketMqSourceSplitEnumerator.java | 352 +++++++++++++++++++
.../rocketmq/source/RocketMqSourceState.java | 38 +++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 7 +
.../{ => connector-rocketmq-e2e}/pom.xml | 60 +---
.../e2e/connector/rocketmq/RocketMqContainer.java | 102 ++++++
.../e2e/connector/rocketmq/RocketMqIT.java | 372 +++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 36 ++
.../resources/rocketmq-sink_fake_to_rocketmq.conf | 69 ++++
.../resources/rocketmq-source_json_to_console.conf | 91 +++++
.../resources/rocketmq-source_text_to_console.conf | 91 +++++
.../rocketmq-text-sink_fake_to_rocketmq.conf | 70 ++++
.../rocketmq_source_earliest_to_console.conf | 69 ++++
.../rocketmq_source_group_offset_to_console.conf | 69 ++++
.../rocketmq_source_latest_to_console.conf | 68 ++++
...ocketmq_source_specific_offsets_to_console.conf | 73 ++++
.../rocketmq_source_timestamp_to_console.conf | 72 ++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
48 files changed, 4419 insertions(+), 47 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 29bb1adbf..7c137264b 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -252,3 +252,19 @@ problems encountered by users.
|---------------------------|------------------------|-------------------------|
| FILTER_FIELD_TRANSFORM-01 | filter field not found | filter field not found. |
+## RocketMq Connector Error Codes
+
+| code | description | solution |
+|-------------|-------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
+| ROCKETMQ-01 | Add a split back to the split enumerator failed, it will only happen when a SourceReader failed | When users encounter this error code, it means that add a split back to the split enumerator failed, please check it. |
+| ROCKETMQ-02 | Add the split checkpoint state to reader failed | When users encounter this error code, it means that add the split checkpoint state to reader failed, please check it. |
+| ROCKETMQ-03 | Rocketmq failed to consume data | When users encounter this error code, it means that rocketmq failed to consume data, please check it., please check it. |
+| ROCKETMQ-04 | Error occurred when the rocketmq consumer thread was running | When the user encounters this error code, it means that an error occurred while running the Rocketmq consumer thread |
+| ROCKETMQ-05 | Rocketmq producer failed to send message | When users encounter this error code, it means that Rocketmq producer failed to send message, please check it. |
+| ROCKETMQ-06 | Rocketmq producer failed to start | When users encounter this error code, it means that Rocketmq producer failed to start, please check it. |
+| ROCKETMQ-07 | Rocketmq consumer failed to start | When users encounter this error code, it means that Rocketmq consumer failed to start, please check it. |
+| ROCKETMQ-08 | Unsupported start mode | When users encounter this error code, it means that the configured start mode is not supported, please check it. |
+| ROCKETMQ-09 | Failed to get the offsets of the current consumer group | When users encounter this error code, it means that failed to get the offsets of the current consumer group, please check it. |
+| ROCKETMQ-10 | Failed to search offset through timestamp | When users encounter this error code, it means that failed to search offset through timestamp, please check it. |
+| ROCKETMQ-11 | Failed to get topic min and max topic | When users encounter this error code, it means that failed to get topic min and max topic, please check it. |
+
diff --git a/docs/en/connector-v2/sink/RocketMQ.md b/docs/en/connector-v2/sink/RocketMQ.md
new file mode 100644
index 000000000..703192021
--- /dev/null
+++ b/docs/en/connector-v2/sink/RocketMQ.md
@@ -0,0 +1,82 @@
+# RocketMQ
+
+> RocketMQ sink connector
+>
+ ## Description
+
+Write Rows to a Apache RocketMQ topic.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we will use 2pc to guarantee the message is sent to RocketMQ exactly once.
+
+## Options
+
+| name | type | required | default value |
+|----------------------|---------|----------|--------------------------|
+| topic | string | yes | - |
+| name.srv.addr | string | yes | - |
+| acl.enabled | Boolean | no | false |
+| access.key | String | no | |
+| secret.key | String | no | |
+| producer.group | String | no | SeaTunnel-producer-Group |
+| semantic | string | no | NON |
+| partition.key.fields | array | no | - |
+| format | String | no | json |
+| field.delimiter | String | no | , |
+| common-options | config | no | - |
+
+### topic [string]
+
+`RocketMQ topic` name.
+
+### name.srv.addr [string]
+
+`RocketMQ` name server cluster address.
+
+### semantic [string]
+
+Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+
+### partition.key.fields [array]
+
+Configure which fields are used as the key of the RocketMQ message.
+
+For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.
+
+Upstream data is the following:
+
+| name | age | data |
+|------|-----|---------------|
+| Jack | 16 | data-example1 |
+| Mary | 23 | data-example2 |
+
+If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.
+
+### format
+
+Data format. The default format is json. Optional text format. The default field separator is ",".
+If you customize the delimiter, add the "field_delimiter" option.
+
+### field_delimiter
+
+Customize the field delimiter for data format.
+
+### common options [config]
+
+Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+sink {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topic = "test-topic-003"
+ partition.key.fields = ["name"]
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/source/RocketMQ.md b/docs/en/connector-v2/source/RocketMQ.md
new file mode 100644
index 000000000..fd209ce70
--- /dev/null
+++ b/docs/en/connector-v2/source/RocketMQ.md
@@ -0,0 +1,142 @@
+# RocketMQ
+
+> RocketMQ source connector
+
+## Description
+
+Source connector for Apache RocketMQ.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+|-------------------------------------|---------|----------|----------------------------|
+| topics | String | yes | - |
+| name.srv.addr | String | yes | - |
+| acl.enabled | Boolean | no | false |
+| access.key | String | no | |
+| secret.key | String | no | |
+| batch.size | int | no | 100 |
+| consumer.group | String | no | SeaTunnel-Consumer-Group |
+| commit.on.checkpoint | Boolean | no | true |
+| schema | | no | - |
+| format | String | no | json |
+| field.delimiter | String | no | , |
+| start.mode | String | no | CONSUME_FROM_GROUP_OFFSETS |
+| start.mode.offsets | | no | |
+| start.mode.timestamp | Long | no | |
+| partition.discovery.interval.millis | long | no | -1 |
+| common-options | config | no | - |
+
+### topics [string]
+
+`RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`.
+
+### name.srv.addr [string]
+
+`RocketMQ` name server cluster address.
+
+### consumer.group [string]
+
+`RocketMQ consumer group id`, used to distinguish different consumer groups.
+
+### acl.enabled [boolean]
+
+If true, access control is enabled, and access key and secret key need to be configured.
+
+### access.key [string]
+
+When ACL_ENABLED is true, access key cannot be empty.
+
+### secret.key [string]
+
+When ACL_ENABLED is true, secret key cannot be empty.
+
+### batch.size [int]
+
+`RocketMQ` consumer pull batch size
+
+### commit.on.checkpoint [boolean]
+
+If true the consumer's offset will be periodically committed in the background.
+
+## partition.discovery.interval.millis [long]
+
+The interval for dynamically discovering topics and partitions.
+
+### schema
+
+The structure of the data, including field names and field types.
+
+## format
+
+Data format. The default format is json. Optional text format. The default field separator is ", ".
+If you customize the delimiter, add the "field.delimiter" option.
+
+## field.delimiter
+
+Customize the field delimiter for data format.
+
+## start.mode
+
+The initial consumption pattern of consumers,there are several types:
+[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP]
+,[CONSUME_FROM_SPECIFIC_OFFSETS]
+
+## start.mode.timestamp
+
+The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".
+
+## start.mode.offsets
+
+The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
+
+for example:
+
+```hocon
+start.mode.offsets = {
+ topic1-0 = 70
+ topic1-1 = 10
+ topic1-2 = 10
+ }
+```
+
+### common-options [config]
+
+Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+
+## Example
+
+### Simple
+
+```hocon
+source {
+ Rocketmq {
+ name.srv.addr = "localhost:9876"
+ topics = "test-topic-002"
+ consumer.group = "consumer-group"
+ parallelism = 2
+ batch.size = 20
+ schema = {
+ fields {
+ age = int
+ name = string
+ }
+ }
+ start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
+ start.mode.offsets = {
+ test-topic-002-0 = 20
+ }
+
+ }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index d72508717..87debbe7e 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -105,4 +105,5 @@ seatunnel.source.Persistiq = connector-http-persistiq
seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
seatunnel.sink.Hbase = connector-hbase
seatunnel.source.StarRocks = connector-starrocks
-
+seatunnel.source.Rocketmq = connector-rocketmq
+seatunnel.sink.Rocketmq = connector-rocketmq
diff --git a/release-note.md b/release-note.md
index 65ae1abab..5554c24d8 100644
--- a/release-note.md
+++ b/release-note.md
@@ -19,6 +19,7 @@
- [Hbase] Add hbase sink connector #4049
- [Github] Add Github source connector #4155
- [CDC] Support export debezium-json format to kafka #4339
+- [RocketMQ] Add RocketMQ source and sink connector #4007
### Formats
- [Canal]Support read canal format message #3950
diff --git a/seatunnel-connectors-v2/connector-rocketmq/pom.xml b/seatunnel-connectors-v2/connector-rocketmq/pom.xml
new file mode 100644
index 000000000..5076eaaad
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <artifactId>connector-rocketmq</artifactId>
+
+ <properties>
+ <rocketmq.version>4.9.4</rocketmq.version>
+ </properties>
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-text</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
new file mode 100644
index 000000000..ee831257a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** Tools for creating RocketMq topic and group. */
+public class RocketMqAdminUtil {
+
+ public static String createUniqInstance(String prefix) {
+ return prefix.concat("-").concat(UUID.randomUUID().toString());
+ }
+
+ public static RPCHook getAclRpcHook(String accessKey, String secretKey) {
+ return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+ }
+
+ /** Init default lite pull consumer */
+ public static DefaultLitePullConsumer initDefaultLitePullConsumer(
+ RocketMqBaseConfiguration config, boolean autoCommit) {
+ DefaultLitePullConsumer consumer = null;
+ if (Objects.isNull(consumer)) {
+ if (StringUtils.isBlank(config.getAccessKey())
+ && StringUtils.isBlank(config.getSecretKey())) {
+ consumer = new DefaultLitePullConsumer(config.getGroupId());
+ } else {
+ consumer =
+ new DefaultLitePullConsumer(
+ config.getGroupId(),
+ getAclRpcHook(config.getAccessKey(), config.getSecretKey()));
+ }
+ }
+ consumer.setNamesrvAddr(config.getNamesrvAddr());
+ String uniqueName = createUniqInstance(config.getNamesrvAddr());
+ consumer.setInstanceName(uniqueName);
+ consumer.setUnitName(uniqueName);
+ consumer.setAutoCommit(autoCommit);
+ if (config.getBatchSize() != null) {
+ consumer.setPullBatchSize(config.getBatchSize());
+ }
+ return consumer;
+ }
+
+ /** Init transaction producer */
+ public static TransactionMQProducer initTransactionMqProducer(
+ RocketMqBaseConfiguration config, TransactionListener listener) {
+ RPCHook rpcHook = null;
+ if (config.isAclEnable()) {
+ rpcHook =
+ new AclClientRPCHook(
+ new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
+ }
+ TransactionMQProducer producer = new TransactionMQProducer(config.getGroupId(), rpcHook);
+ producer.setNamesrvAddr(config.getNamesrvAddr());
+ producer.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+ producer.setLanguage(LanguageCode.JAVA);
+ producer.setTransactionListener(listener);
+ if (config.getMaxMessageSize() != null) {
+ producer.setMaxMessageSize(config.getMaxMessageSize());
+ }
+ if (config.getSendMsgTimeout() != null) {
+ producer.setSendMsgTimeout(config.getSendMsgTimeout());
+ }
+
+ return producer;
+ }
+
+ public static DefaultMQProducer initDefaultMqProducer(RocketMqBaseConfiguration config) {
+ RPCHook rpcHook = null;
+ if (config.isAclEnable()) {
+ rpcHook =
+ new AclClientRPCHook(
+ new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
+ }
+ DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+ producer.setNamesrvAddr(config.getNamesrvAddr());
+ producer.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+ producer.setProducerGroup(config.getGroupId());
+ producer.setLanguage(LanguageCode.JAVA);
+ if (config.getMaxMessageSize() != null && config.getMaxMessageSize() > 0) {
+ producer.setMaxMessageSize(config.getMaxMessageSize());
+ }
+ if (config.getSendMsgTimeout() != null && config.getMaxMessageSize() > 0) {
+ producer.setSendMsgTimeout(config.getSendMsgTimeout());
+ }
+ return producer;
+ }
+
+ private static DefaultMQAdminExt startMQAdminTool(RocketMqBaseConfiguration config)
+ throws MQClientException {
+ DefaultMQAdminExt admin;
+ if (config.isAclEnable()) {
+ admin =
+ new DefaultMQAdminExt(
+ new AclClientRPCHook(
+ new SessionCredentials(
+ config.getAccessKey(), config.getSecretKey())));
+ } else {
+ admin = new DefaultMQAdminExt();
+ }
+ admin.setNamesrvAddr(config.getNamesrvAddr());
+ admin.setAdminExtGroup(config.getGroupId());
+ admin.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+ admin.start();
+ return admin;
+ }
+
+ /** Create rocketMq topic */
+ public static void createTopic(RocketMqBaseConfiguration config, TopicConfig topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(config);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set<String> clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RocketMqConnectorException(RocketMqConnectorErrorCode.CREATE_TOPIC_ERROR, e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ /** check topic exist */
+ public static boolean topicExist(RocketMqBaseConfiguration config, String topic) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ boolean foundTopicRouteInfo = false;
+ try {
+ defaultMQAdminExt = startMQAdminTool(config);
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData != null) {
+ foundTopicRouteInfo = true;
+ }
+ } catch (Exception e) {
+ if (e instanceof MQClientException) {
+ if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+ foundTopicRouteInfo = false;
+ } else {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
+ }
+ } else {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
+ }
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return foundTopicRouteInfo;
+ }
+
+ /** Get topic offsets */
+ public static List<Map<MessageQueue, TopicOffset>> offsetTopics(
+ RocketMqBaseConfiguration config, List<String> topics) {
+ List<Map<MessageQueue, TopicOffset>> offsets = Lists.newArrayList();
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+ for (String topic : topics) {
+ TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
+ offsets.add(topicStatsTable.getOffsetTable());
+ }
+ return offsets;
+ } catch (MQClientException
+ | MQBrokerException
+ | RemotingException
+ | InterruptedException e) {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.GET_MIN_AND_MAX_OFFSETS_ERROR, e);
+ } finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+
+ /** Flat topics offsets */
+ public static Map<MessageQueue, TopicOffset> flatOffsetTopics(
+ RocketMqBaseConfiguration config, List<String> topics) {
+ Map<MessageQueue, TopicOffset> messageQueueTopicOffsets = Maps.newConcurrentMap();
+ offsetTopics(config, topics)
+ .forEach(
+ offsetTopic -> {
+ messageQueueTopicOffsets.putAll(offsetTopic);
+ });
+ return messageQueueTopicOffsets;
+ }
+
+ /** Search offsets by timestamp */
+ public static Map<MessageQueue, Long> searchOffsetsByTimestamp(
+ RocketMqBaseConfiguration config,
+ Collection<MessageQueue> messageQueues,
+ Long timestamp) {
+ Map<MessageQueue, Long> offsets = Maps.newConcurrentMap();
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+ for (MessageQueue messageQueue : messageQueues) {
+ long offset = adminClient.searchOffset(messageQueue, timestamp);
+ offsets.put(messageQueue, offset);
+ }
+ return offsets;
+ } catch (MQClientException e) {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_TIMESTAMP_ERROR, e);
+ } finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+
+ /** Get consumer group offset */
+ public static Map<MessageQueue, Long> currentOffsets(
+ RocketMqBaseConfiguration config,
+ List<String> topics,
+ Set<MessageQueue> messageQueues) {
+ // Get consumer group offset
+ DefaultMQAdminExt adminClient = null;
+ try {
+ adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+ Map<MessageQueue, OffsetWrapper> consumerOffsets = Maps.newConcurrentMap();
+ for (String topic : topics) {
+ ConsumeStats consumeStats =
+ adminClient.examineConsumeStats(config.getGroupId(), topic);
+ consumerOffsets.putAll(consumeStats.getOffsetTable());
+ }
+ return consumerOffsets.keySet().stream()
+ .filter(messageQueue -> messageQueues.contains(messageQueue))
+ .collect(
+ Collectors.toMap(
+ messageQueue -> messageQueue,
+ messageQueue ->
+ consumerOffsets.get(messageQueue).getConsumerOffset()));
+ } catch (MQClientException
+ | MQBrokerException
+ | RemotingException
+ | InterruptedException e) {
+ if (e instanceof MQClientException) {
+ if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+ return Collections.emptyMap();
+ } else {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_ERROR, e);
+ }
+ } else {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_ERROR, e);
+ }
+ } finally {
+ if (adminClient != null) {
+ adminClient.shutdown();
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqBaseConfiguration.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqBaseConfiguration.java
new file mode 100644
index 000000000..eba809d65
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqBaseConfiguration.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Configuration for connecting RocketMq */
+@Setter
+@Getter
+public class RocketMqBaseConfiguration implements Serializable {
+ private String namesrvAddr;
+ private String groupId;
+ /** set acl config */
+ private boolean aclEnable;
+
+ private String accessKey;
+ private String secretKey;
+
+ // consumer
+ private Integer batchSize;
+ private Long pollTimeoutMillis;
+
+ // producer
+ private Integer maxMessageSize;
+ private Integer sendMsgTimeout;
+
+ private RocketMqBaseConfiguration(
+ String groupId,
+ String namesrvAddr,
+ boolean aclEnable,
+ String accessKey,
+ String secretKey) {
+ this.groupId = groupId;
+ this.namesrvAddr = namesrvAddr;
+ this.aclEnable = aclEnable;
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
+ }
+
+ private RocketMqBaseConfiguration(
+ String groupId,
+ String namesrvAddr,
+ boolean aclEnable,
+ String accessKey,
+ String secretKey,
+ int pullBatchSize,
+ Long consumerPullTimeoutMillis) {
+ this(groupId, namesrvAddr, aclEnable, accessKey, secretKey);
+ this.batchSize = pullBatchSize;
+ this.pollTimeoutMillis = consumerPullTimeoutMillis;
+ }
+
+ private RocketMqBaseConfiguration(
+ String groupId,
+ String namesrvAddr,
+ boolean aclEnable,
+ String accessKey,
+ String secretKey,
+ int maxMessageSize,
+ int sendMsgTimeout) {
+
+ this(groupId, namesrvAddr, aclEnable, accessKey, secretKey);
+ this.maxMessageSize = maxMessageSize;
+ this.sendMsgTimeout = sendMsgTimeout;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RocketMqBaseConfiguration that = (RocketMqBaseConfiguration) o;
+ return aclEnable == that.aclEnable
+ && batchSize == that.batchSize
+ && pollTimeoutMillis == that.pollTimeoutMillis
+ && maxMessageSize == that.maxMessageSize
+ && sendMsgTimeout == that.sendMsgTimeout
+ && Objects.equals(namesrvAddr, that.namesrvAddr)
+ && Objects.equals(groupId, that.groupId)
+ && Objects.equals(accessKey, that.accessKey)
+ && Objects.equals(secretKey, that.secretKey);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ namesrvAddr,
+ groupId,
+ aclEnable,
+ accessKey,
+ secretKey,
+ batchSize,
+ pollTimeoutMillis,
+ maxMessageSize,
+ sendMsgTimeout);
+ }
+
+ @Override
+ public String toString() {
+ return "RocketMqBaseConfiguration{"
+ + "namesrvAddr='"
+ + namesrvAddr
+ + '\''
+ + ", groupId='"
+ + groupId
+ + '\''
+ + ", aclEnable="
+ + aclEnable
+ + ", accessKey='"
+ + accessKey
+ + '\''
+ + ", secretKey='"
+ + secretKey
+ + '\''
+ + ", pullBatchSize="
+ + batchSize
+ + ", pollTimeoutMillis="
+ + pollTimeoutMillis
+ + ", maxMessageSize="
+ + maxMessageSize
+ + ", sendMsgTimeout="
+ + sendMsgTimeout
+ + '}';
+ }
+
+ enum ConfigType {
+ NONE,
+ CONSUMER,
+ PRODUCER
+ }
+
+ public static class Builder {
+ private String namesrvAddr;
+ private String groupId;
+ private boolean aclEnable;
+ private String accessKey;
+ private String secretKey;
+ // consumer
+ private Integer batchSize;
+ private Long pollTimeoutMillis;
+
+ // producer
+ private Integer maxMessageSize;
+ private Integer sendMsgTimeout;
+
+ private ConfigType configType = ConfigType.NONE;
+
+ public Builder consumer() {
+ this.configType = ConfigType.CONSUMER;
+ return this;
+ }
+
+ public Builder producer() {
+ this.configType = ConfigType.PRODUCER;
+ return this;
+ }
+
+ public Builder namesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ return this;
+ }
+
+ public Builder groupId(String groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
+ public Builder aclEnable(boolean aclEnable) {
+ this.aclEnable = aclEnable;
+ return this;
+ }
+
+ public Builder accessKey(String accessKey) {
+ this.accessKey = accessKey;
+ return this;
+ }
+
+ public Builder secretKey(String secretKey) {
+ this.secretKey = secretKey;
+ return this;
+ }
+
+ public Builder batchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
+ public Builder pollTimeoutMillis(long consumerPullTimeoutMillis) {
+ this.pollTimeoutMillis = consumerPullTimeoutMillis;
+ return this;
+ }
+
+ public Builder maxMessageSize(int maxMessageSize) {
+ this.maxMessageSize = maxMessageSize;
+ return this;
+ }
+
+ public Builder sendMsgTimeout(int sendMsgTimeout) {
+ this.sendMsgTimeout = sendMsgTimeout;
+ return this;
+ }
+
+ public RocketMqBaseConfiguration build() {
+ switch (configType) {
+ case CONSUMER:
+ return new RocketMqBaseConfiguration(
+ groupId,
+ namesrvAddr,
+ aclEnable,
+ accessKey,
+ secretKey,
+ batchSize,
+ pollTimeoutMillis);
+ case PRODUCER:
+ return new RocketMqBaseConfiguration(
+ groupId,
+ namesrvAddr,
+ aclEnable,
+ accessKey,
+ secretKey,
+ maxMessageSize,
+ sendMsgTimeout);
+ default:
+ return new RocketMqBaseConfiguration(
+ groupId, namesrvAddr, aclEnable, accessKey, secretKey);
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/SchemaFormat.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/SchemaFormat.java
new file mode 100644
index 000000000..c583d7245
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/SchemaFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+/** schema format type */
+public enum SchemaFormat {
+ JSON("json"),
+ TEXT("text");
+
+ private final String name;
+
+ SchemaFormat(String name) {
+ this.name = name;
+ }
+
+ /** find format */
+ public static SchemaFormat find(String name) {
+ for (SchemaFormat format : values()) {
+ if (format.getName().equals(name)) {
+ return format;
+ }
+ }
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + name);
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/StartMode.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/StartMode.java
new file mode 100644
index 000000000..a80be29ef
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/StartMode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+/** Consumer start mode */
+public enum StartMode {
+ CONSUME_FROM_LAST_OFFSET,
+ CONSUME_FROM_FIRST_OFFSET,
+ CONSUME_FROM_GROUP_OFFSETS,
+ CONSUME_FROM_TIMESTAMP,
+ CONSUME_FROM_SPECIFIC_OFFSETS,
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
new file mode 100644
index 000000000..eced97260
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+
+public class Config {
+
+ /** The default field delimiter is “,” */
+ public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+ public static final Option<String> NAME_SRV_ADDR =
+ Options.key("name.srv.addr")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("RocketMq name server configuration center address.");
+
+ public static final Option<Boolean> ACL_ENABLED =
+ Options.key("acl.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, access control is enabled, and access key and secret key need to be "
+ + "configured.");
+
+ public static final Option<String> ACCESS_KEY =
+ Options.key("access.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("When ACL_ENABLED is true, access key cannot be empty.");
+
+ public static final Option<String> SECRET_KEY =
+ Options.key("secret.key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("When ACL_ENABLED is true, secret key cannot be empty.");
+
+ public static final Option<String> FORMAT =
+ Options.key("format")
+ .stringType()
+ .defaultValue(SchemaFormat.JSON.getName())
+ .withDescription(
+ "Data format. The default format is json. Optional text format. The default field separator is \", \". "
+ + "If you customize the delimiter, add the \"field.delimiter\" option.");
+
+ public static final Option<String> FIELD_DELIMITER =
+ Options.key("field.delimiter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Customize the field delimiter for data format.");
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
new file mode 100644
index 000000000..534b4c8bd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+/** Consumer config */
+public class ConsumerConfig extends Config {
+
+ public static final Option<String> TOPICS =
+ Options.key("topics")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "RocketMq topic name. If there are multiple topics, use , to split, for example: "
+ + "\"tpc1,tpc2\".");
+ public static final Option<String> CONSUMER_GROUP =
+ Options.key("consumer.group")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("RocketMq consumer group id.");
+ public static final Option<Boolean> COMMIT_ON_CHECKPOINT =
+ Options.key("commit.on.checkpoint")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "If true, the consumer's offset will be stored in the background periodically.");
+ public static final Option<Config> SCHEMA =
+ Options.key("schema")
+ .objectType(Config.class)
+ .noDefaultValue()
+ .withDescription(
+ "The structure of the data, including field names and field types.");
+ public static final Option<StartMode> START_MODE =
+ Options.key("start.mode")
+ .objectType(StartMode.class)
+ .defaultValue(StartMode.CONSUME_FROM_GROUP_OFFSETS)
+ .withDescription(
+ "The initial consumption pattern of consumers,there are several types:\n"
+ + "[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]");
+ public static final Option<Long> START_MODE_TIMESTAMP =
+ Options.key("start.mode.timestamp")
+ .longType()
+ .noDefaultValue()
+ .withDescription("The time required for consumption mode to be timestamp.");
+ public static final Option<Config> START_MODE_OFFSETS =
+ Options.key("start.mode.offsets")
+ .objectType(Config.class)
+ .noDefaultValue()
+ .withDescription(
+ "The offset required for consumption mode to be specific offsets.");
+ /** Configuration key to define the consumer's partition discovery interval, in milliseconds. */
+ public static final Option<Long> KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS =
+ Options.key("partition.discovery" + ".interval.millis")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription(
+ "The interval for dynamically discovering topics and partitions.");
+
+ private static final int DEFAULT_BATCH_SIZE = 100;
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch.size")
+ .intType()
+ .defaultValue(DEFAULT_BATCH_SIZE)
+ .withDescription("Rocketmq consumer pull batch size.");
+ private static final long DEFAULT_POLL_TIMEOUT_MILLIS = 5000;
+ public static final Option<Long> POLL_TIMEOUT_MILLIS =
+ Options.key("consumer.poll.timeout.millis")
+ .longType()
+ .defaultValue(DEFAULT_POLL_TIMEOUT_MILLIS)
+ .withDescription("The poll timeout in milliseconds.");
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ProducerConfig.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ProducerConfig.java
new file mode 100644
index 000000000..134a9908f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ProducerConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class ProducerConfig extends Config {
+
+ public static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 * 4;
+ public static final int DEFAULT_SEND_MESSAGE_TIMEOUT_MILLIS = 3000;
+ public static final Option<String> TOPIC =
+ Options.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("RocketMq topic name. ");
+
+ public static final Option<String> PRODUCER_GROUP =
+ Options.key("producer.group")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("RocketMq producer group id.");
+
+ public static final Option<List<String>> PARTITION_KEY_FIELDS =
+ Options.key("partition.key.fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription(
+ "Configure which fields are used as the key of the RocketMq message.");
+
+ public static final Option<Boolean> EXACTLY_ONCE =
+ Options.key("exactly.once")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("If true, the transaction message will be sent.");
+
+ public static final Option<Boolean> SEND_SYNC =
+ Options.key("producer.send.sync")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("If true, the message will be sync sent.");
+
+ public static final Option<Integer> MAX_MESSAGE_SIZE =
+ Options.key("max.message.size")
+ .intType()
+ .defaultValue(DEFAULT_MAX_MESSAGE_SIZE)
+ .withDescription("Maximum allowed message body size in bytes.");
+
+ public static final Option<Integer> SEND_MESSAGE_TIMEOUT_MILLIS =
+ Options.key("send.message.timeout")
+ .intType()
+ .defaultValue(DEFAULT_SEND_MESSAGE_TIMEOUT_MILLIS)
+ .withDescription("Timeout for sending messages.");
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorErrorCode.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorErrorCode.java
new file mode 100644
index 000000000..618ef7b7f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorErrorCode.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum RocketMqConnectorErrorCode implements SeaTunnelErrorCode {
+ ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED(
+ "ROCKETMQ-01",
+ "Add a split back to the split enumerator failed, it will only happen when a SourceReader failed"),
+ ADD_SPLIT_CHECKPOINT_FAILED("ROCKETMQ-02", "Add the split checkpoint state to reader failed"),
+ CONSUME_DATA_FAILED("ROCKETMQ-03", "Rocketmq failed to consume data"),
+ CONSUME_THREAD_RUN_ERROR(
+ "ROCKETMQ-04", "Error occurred when the rocketmq consumer thread was running"),
+ PRODUCER_SEND_MESSAGE_ERROR("ROCKETMQ-05", "Rocketmq producer failed to send message"),
+ PRODUCER_START_ERROR("ROCKETMQ-06", "Rocketmq producer failed to start"),
+ CONSUMER_START_ERROR("ROCKETMQ-07", "Rocketmq consumer failed to start"),
+
+ UNSUPPORTED_START_MODE_ERROR("ROCKETMQ-08", "Unsupported start mode"),
+
+ GET_CONSUMER_GROUP_OFFSETS_ERROR(
+ "ROCKETMQ-09", "Failed to get the offsets of the current consumer group"),
+
+ GET_CONSUMER_GROUP_OFFSETS_TIMESTAMP_ERROR(
+ "ROCKETMQ-10", "Failed to search offset through timestamp"),
+
+ GET_MIN_AND_MAX_OFFSETS_ERROR("ROCKETMQ-11", "Failed to get topic min and max topic"),
+
+ TOPIC_NOT_EXIST_ERROR("ROCKETMQ-12", "Check the topic for errors"),
+
+ CREATE_TOPIC_ERROR("ROCKETMQ-13", "Failed to create topic"),
+ ;
+
+ private final String code;
+ private final String description;
+
+ RocketMqConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return this.code;
+ }
+
+ @Override
+ public String getDescription() {
+ return this.description;
+ }
+
+ @Override
+ public String getErrorMessage() {
+ return SeaTunnelErrorCode.super.getErrorMessage();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorException.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorException.java
new file mode 100644
index 000000000..f71c63900
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class RocketMqConnectorException extends SeaTunnelRuntimeException {
+ public RocketMqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+ super(seaTunnelErrorCode, errorMessage);
+ }
+
+ public RocketMqConnectorException(
+ SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+ super(seaTunnelErrorCode, errorMessage, cause);
+ }
+
+ public RocketMqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+ super(seaTunnelErrorCode, cause);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 000000000..67b120633
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.rocketmq.common.message.Message;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.function.Function;
+
+@Slf4j
+public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
+ private final String topic;
+ private final SerializationSchema keySerialization;
+ private final SerializationSchema valueSerialization;
+
+ public DefaultSeaTunnelRowSerializer(
+ String topic,
+ SeaTunnelRowType seaTunnelRowType,
+ SchemaFormat format,
+ String delimiter) {
+ this(
+ topic,
+ element -> null,
+ createSerializationSchema(seaTunnelRowType, format, delimiter));
+ }
+
+ public DefaultSeaTunnelRowSerializer(
+ String topic,
+ List<String> keyFieldNames,
+ SeaTunnelRowType seaTunnelRowType,
+ SchemaFormat format,
+ String delimiter) {
+ this(
+ topic,
+ createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
+ createSerializationSchema(seaTunnelRowType, format, delimiter));
+ }
+
+ public DefaultSeaTunnelRowSerializer(
+ String topic,
+ SerializationSchema keySerialization,
+ SerializationSchema valueSerialization) {
+ this.topic = topic;
+ this.keySerialization = keySerialization;
+ this.valueSerialization = valueSerialization;
+ }
+
+ private static SerializationSchema createSerializationSchema(
+ SeaTunnelRowType rowType, SchemaFormat format, String delimiter) {
+ switch (format) {
+ case TEXT:
+ return TextSerializationSchema.builder()
+ .seaTunnelRowType(rowType)
+ .delimiter(delimiter)
+ .build();
+ case JSON:
+ return new JsonSerializationSchema(rowType);
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+ }
+ }
+
+ private static SerializationSchema createKeySerializationSchema(
+ List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
+ if (keyFieldNames == null || keyFieldNames.isEmpty()) {
+ return element -> null;
+ }
+ int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+ SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
+ for (int i = 0; i < keyFieldNames.size(); i++) {
+ String keyFieldName = keyFieldNames.get(i);
+ int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+ keyFieldIndexArr[i] = rowFieldIndex;
+ keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
+ }
+ SeaTunnelRowType keyType =
+ new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+ SerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType);
+ Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor =
+ row -> {
+ Object[] keyFields = new Object[keyFieldIndexArr.length];
+ for (int i = 0; i < keyFieldIndexArr.length; i++) {
+ keyFields[i] = row.getField(keyFieldIndexArr[i]);
+ }
+ return new SeaTunnelRow(keyFields);
+ };
+ return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
+ }
+
+ @Override
+ public Message serializeRow(SeaTunnelRow row) {
+ byte[] value = valueSerialization.serialize(row);
+ if (value == null) {
+ return null;
+ }
+ byte[] key = keySerialization.serialize(row);
+ return new Message(topic, null, key == null ? null : new String(key), value);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..6c04d92f7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.rocketmq.common.message.Message;
+
+public interface SeaTunnelRowSerializer<K, V> {
+
+ /**
+ * Serialize the {@link SeaTunnelRow} to a RocketMq {@link Message}.
+ *
+ * @param row seatunnel row
+ * @return rocketmq record.
+ */
+ Message serializeRow(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/ProducerMetadata.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/ProducerMetadata.java
new file mode 100644
index 000000000..34add2fd5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/ProducerMetadata.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ProducerMetadata implements Serializable {
+ /** basic config */
+ private RocketMqBaseConfiguration configuration;
+ /** send topic */
+ private String topic;
+
+ /** partition key fields */
+ private List<String> partitionKeyFields;
+ /** RocketMq semantics */
+ private boolean exactlyOnce;
+ /** schema format */
+ private SchemaFormat format;
+
+ /** field delimiter */
+ private String fieldDelimiter;
+
+ /** producer send sync */
+ private boolean sync;
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqNoTransactionSender.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqNoTransactionSender.java
new file mode 100644
index 000000000..92fa68a39
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqNoTransactionSender.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_SEND_MESSAGE_ERROR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_START_ERROR;
+
+@Slf4j
+public class RocketMqNoTransactionSender implements RocketMqProducerSender {
+
+ private final DefaultMQProducer rocketMqProducer;
+ private final boolean isSync;
+
+ public RocketMqNoTransactionSender(RocketMqBaseConfiguration configuration, boolean isSync) {
+ this.isSync = isSync;
+ this.rocketMqProducer = RocketMqAdminUtil.initDefaultMqProducer(configuration);
+ try {
+ this.rocketMqProducer.start();
+ } catch (MQClientException e) {
+ throw new RocketMqConnectorException(PRODUCER_START_ERROR, e);
+ }
+ }
+
+ @Override
+ public void send(Message message) {
+ if (message == null) {
+ return;
+ }
+ try {
+ if (isSync) {
+ if (StringUtils.isEmpty(message.getKeys())) {
+ this.rocketMqProducer.send(message);
+ } else {
+ this.rocketMqProducer.send(
+ message, new SelectMessageQueueByHash(), message.getKeys());
+ }
+ } else {
+ SendCallback callback =
+ new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ // No-op
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ log.error("Failed to send data to rocketmq", e);
+ }
+ };
+ if (StringUtils.isEmpty(message.getKeys())) {
+ this.rocketMqProducer.send(message, callback);
+ } else {
+ this.rocketMqProducer.send(
+ message, new SelectMessageQueueByHash(), message.getKeys(), callback);
+ }
+ }
+ } catch (MQClientException
+ | RemotingException
+ | InterruptedException
+ | MQBrokerException e) {
+ throw new RocketMqConnectorException(PRODUCER_SEND_MESSAGE_ERROR, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (rocketMqProducer != null) {
+ this.rocketMqProducer.shutdown();
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqProducerSender.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqProducerSender.java
new file mode 100644
index 000000000..5deded333
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqProducerSender.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.rocketmq.common.message.Message;
+
+public interface RocketMqProducerSender extends AutoCloseable {
+
+ /** Send data to RocketMq. */
+ void send(Message message);
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
new file mode 100644
index 000000000..04ea6bc3e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ProducerConfig;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.DEFAULT_FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY;
+
+@AutoService(SeaTunnelSink.class)
+public class RocketMqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+ private static final String DEFAULT_PRODUCER_GROUP = "SeaTunnel-Producer-Group";
+ private SeaTunnelRowType seaTunnelRowType;
+ private ProducerMetadata producerMetadata;
+
+ @Override
+ public String getPluginName() {
+ return "Rocketmq";
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ config, ProducerConfig.TOPIC.key(), NAME_SRV_ADDR.key());
+ if (!result.isSuccess()) {
+ throw new RocketMqConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SINK, result.getMsg()));
+ }
+ producerMetadata = new ProducerMetadata();
+ producerMetadata.setTopic(config.getString(ProducerConfig.TOPIC.key()));
+ RocketMqBaseConfiguration.Builder baseConfigurationBuilder =
+ RocketMqBaseConfiguration.newBuilder()
+ .producer()
+ .namesrvAddr(config.getString(NAME_SRV_ADDR.key()));
+ // acl config
+ boolean aclEnabled = ACL_ENABLED.defaultValue();
+ if (config.hasPath(ACL_ENABLED.key())) {
+ aclEnabled = config.getBoolean(ACL_ENABLED.key());
+ if (aclEnabled
+ && (!config.hasPath(ACCESS_KEY.key()) || !config.hasPath(SECRET_KEY.key()))) {
+ throw new RocketMqConnectorException(
+ SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
+ "When ACL_ENABLED "
+ + "true , ACCESS_KEY and SECRET_KEY must be configured");
+ }
+ if (config.hasPath(ACCESS_KEY.key())) {
+ baseConfigurationBuilder.accessKey(config.getString(ACCESS_KEY.key()));
+ }
+ if (config.hasPath(SECRET_KEY.key())) {
+ baseConfigurationBuilder.secretKey(config.getString(SECRET_KEY.key()));
+ }
+ }
+ baseConfigurationBuilder.aclEnable(aclEnabled);
+
+ // config producer group
+ if (config.hasPath(ProducerConfig.PRODUCER_GROUP.key())) {
+ baseConfigurationBuilder.groupId(config.getString(ProducerConfig.PRODUCER_GROUP.key()));
+ } else {
+ baseConfigurationBuilder.groupId(DEFAULT_PRODUCER_GROUP);
+ }
+
+ if (config.hasPath(ProducerConfig.MAX_MESSAGE_SIZE.key())) {
+ baseConfigurationBuilder.maxMessageSize(
+ config.getInt(ProducerConfig.MAX_MESSAGE_SIZE.key()));
+ } else {
+ baseConfigurationBuilder.maxMessageSize(ProducerConfig.MAX_MESSAGE_SIZE.defaultValue());
+ }
+
+ if (config.hasPath(ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS.key())) {
+ baseConfigurationBuilder.sendMsgTimeout(
+ config.getInt(ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS.key()));
+ } else {
+ baseConfigurationBuilder.sendMsgTimeout(
+ ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS.defaultValue());
+ }
+
+ this.producerMetadata.setConfiguration(baseConfigurationBuilder.build());
+
+ if (config.hasPath(FORMAT.key())) {
+ producerMetadata.setFormat(
+ SchemaFormat.valueOf(config.getString(FORMAT.key()).toUpperCase()));
+ } else {
+ producerMetadata.setFormat(SchemaFormat.JSON);
+ }
+
+ if (config.hasPath(FIELD_DELIMITER.key())) {
+ producerMetadata.setFieldDelimiter(config.getString(FIELD_DELIMITER.key()));
+ } else {
+ producerMetadata.setFieldDelimiter(DEFAULT_FIELD_DELIMITER);
+ }
+
+ if (config.hasPath(ProducerConfig.PARTITION_KEY_FIELDS.key())) {
+ producerMetadata.setPartitionKeyFields(
+ config.getStringList(ProducerConfig.PARTITION_KEY_FIELDS.key()));
+ }
+
+ boolean exactlyOnce = ProducerConfig.EXACTLY_ONCE.defaultValue();
+ if (config.hasPath(ProducerConfig.EXACTLY_ONCE.key())) {
+ exactlyOnce = config.getBoolean(ProducerConfig.EXACTLY_ONCE.key());
+ }
+ producerMetadata.setExactlyOnce(exactlyOnce);
+
+ boolean sync = ProducerConfig.SEND_SYNC.defaultValue();
+ if (config.hasPath(ProducerConfig.SEND_SYNC.key())) {
+ sync = config.getBoolean(ProducerConfig.SEND_SYNC.key());
+ }
+ producerMetadata.setSync(sync);
+ }
+
+ @Override
+ public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ this.seaTunnelRowType = seaTunnelRowType;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+ return this.seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
+ throws IOException {
+ return new RocketMqSinkWriter(producerMetadata, seaTunnelRowType);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
new file mode 100644
index 000000000..efad35dea
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ProducerConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RocketMqSinkFactory implements TableSinkFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Rocketmq";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ProducerConfig.TOPIC, Config.NAME_SRV_ADDR)
+ .optional(
+ ProducerConfig.PRODUCER_GROUP,
+ ProducerConfig.PARTITION_KEY_FIELDS,
+ ProducerConfig.EXACTLY_ONCE,
+ ProducerConfig.SEND_SYNC,
+ ProducerConfig.MAX_MESSAGE_SIZE,
+ ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS)
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
new file mode 100644
index 000000000..a1799f2b1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.SeaTunnelRowSerializer;
+
+import org.apache.rocketmq.common.message.Message;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class RocketMqSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+ private final ProducerMetadata producerMetadata;
+ private final SeaTunnelRowSerializer seaTunnelRowSerializer;
+ private final RocketMqProducerSender rocketMqProducerSender;
+
+ public RocketMqSinkWriter(
+ ProducerMetadata producerMetadata, SeaTunnelRowType seaTunnelRowType) {
+ this.producerMetadata = producerMetadata;
+ this.seaTunnelRowSerializer = getSerializer(seaTunnelRowType);
+ if (producerMetadata.isExactlyOnce()) {
+ this.rocketMqProducerSender =
+ new RocketMqTransactionSender(producerMetadata.getConfiguration());
+ } else {
+ this.rocketMqProducerSender =
+ new RocketMqNoTransactionSender(
+ producerMetadata.getConfiguration(), producerMetadata.isSync());
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ Message message = seaTunnelRowSerializer.serializeRow(element);
+ rocketMqProducerSender.send(message);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.rocketMqProducerSender != null) {
+ try {
+ this.rocketMqProducerSender.close();
+ } catch (Exception e) {
+ throw new RocketMqConnectorException(
+ CommonErrorCode.WRITER_OPERATION_FAILED,
+ "Close RocketMq sink writer error",
+ e);
+ }
+ }
+ }
+
+ private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
+ SeaTunnelRowType seaTunnelRowType) {
+ return new DefaultSeaTunnelRowSerializer(
+ producerMetadata.getTopic(),
+ getPartitionKeyFields(seaTunnelRowType),
+ seaTunnelRowType,
+ producerMetadata.getFormat(),
+ producerMetadata.getFieldDelimiter());
+ }
+
+ private List<String> getPartitionKeyFields(SeaTunnelRowType seaTunnelRowType) {
+ if (producerMetadata.getPartitionKeyFields() == null) {
+ return Collections.emptyList();
+ }
+ List<String> partitionKeyFields = producerMetadata.getPartitionKeyFields();
+ // Check whether the key exists
+ List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
+ for (String partitionKeyField : partitionKeyFields) {
+ if (!rowTypeFieldNames.contains(partitionKeyField)) {
+ throw new RocketMqConnectorException(
+ CommonErrorCode.ILLEGAL_ARGUMENT,
+ String.format(
+ "Partition key field not found: %s, rowType: %s",
+ partitionKeyField, rowTypeFieldNames));
+ }
+ }
+ return partitionKeyFields;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqTransactionSender.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqTransactionSender.java
new file mode 100644
index 000000000..d3a77f06b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqTransactionSender.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_SEND_MESSAGE_ERROR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_START_ERROR;
+
+public class RocketMqTransactionSender implements RocketMqProducerSender {
+
+ private static final String TXN_PARAM = "SeaTunnel-RocketMq";
+ private final TransactionMQProducer transactionMQProducer;
+
+ public RocketMqTransactionSender(RocketMqBaseConfiguration configuration) {
+ this.transactionMQProducer =
+ RocketMqAdminUtil.initTransactionMqProducer(
+ configuration,
+ new TransactionListener() {
+ @Override
+ public LocalTransactionState executeLocalTransaction(
+ Message msg, Object arg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+
+ @Override
+ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+ });
+ try {
+ this.transactionMQProducer.start();
+ } catch (MQClientException e) {
+ throw new RocketMqConnectorException(PRODUCER_START_ERROR, e);
+ }
+ }
+
+ @Override
+ public void send(Message message) {
+ try {
+ transactionMQProducer.sendMessageInTransaction(
+ message,
+ StringUtils.isEmpty(message.getKeys()) ? TXN_PARAM : message.getKeys());
+ } catch (MQClientException e) {
+ throw new RocketMqConnectorException(PRODUCER_SEND_MESSAGE_ERROR, e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (transactionMQProducer != null) {
+ this.transactionMQProducer.shutdown();
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/ConsumerMetadata.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/ConsumerMetadata.java
new file mode 100644
index 000000000..77597ed16
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/ConsumerMetadata.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/** rocketmq consumer metadata */
+@Data
+public class ConsumerMetadata implements Serializable {
+ private RocketMqBaseConfiguration baseConfig = RocketMqBaseConfiguration.newBuilder().build();
+ private List<String> topics;
+ private boolean enabledCommitCheckpoint = false;
+ private StartMode startMode;
+ private Map<MessageQueue, Long> specificStartOffsets;
+ private Long startOffsetsTimestamp;
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
new file mode 100644
index 000000000..0c0786569
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class RocketMqConsumerThread implements Runnable {
+ private final DefaultLitePullConsumer consumer;
+ private final ConsumerMetadata metadata;
+ private final LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> tasks;
+
+ public RocketMqConsumerThread(ConsumerMetadata metadata) {
+ this.metadata = metadata;
+ this.tasks = new LinkedBlockingQueue<>();
+ this.consumer =
+ RocketMqAdminUtil.initDefaultLitePullConsumer(
+ this.metadata.getBaseConfig(), !metadata.isEnabledCommitCheckpoint());
+ try {
+ this.consumer.start();
+ } catch (MQClientException e) {
+ // Start rocketmq failed
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.CONSUMER_START_ERROR, e);
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Consumer<DefaultLitePullConsumer> task = tasks.poll(1, TimeUnit.SECONDS);
+ if (task != null) {
+ task.accept(consumer);
+ }
+ } catch (InterruptedException e) {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
+ }
+ }
+ }
+
+ public LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> getTasks() {
+ return tasks;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
new file mode 100644
index 000000000..947a95a38
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ConsumerConfig;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.DEFAULT_FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY;
+import static org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions.concise;
+
+/** RocketMq source */
+@AutoService(SeaTunnelSource.class)
+public class RocketMqSource
+ implements SeaTunnelSource<SeaTunnelRow, RocketMqSourceSplit, RocketMqSourceState>,
+ SupportParallelism {
+
+ private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
+ private final ConsumerMetadata metadata = new ConsumerMetadata();
+ private JobContext jobContext;
+ private SeaTunnelRowType typeInfo;
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private long discoveryIntervalMillis =
+ ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
+
+ @Override
+ public String getPluginName() {
+ return "Rocketmq";
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return JobMode.BATCH.equals(jobContext.getJobMode())
+ ? Boundedness.BOUNDED
+ : Boundedness.UNBOUNDED;
+ }
+
+ @Override
+ public void prepare(Config config) throws PrepareFailException {
+ // check config
+ CheckResult result =
+ CheckConfigUtil.checkAllExists(
+ config, ConsumerConfig.TOPICS.key(), NAME_SRV_ADDR.key());
+ if (!result.isSuccess()) {
+ throw new RocketMqConnectorException(
+ SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+ String.format(
+ "PluginName: %s, PluginType: %s, Message: %s",
+ getPluginName(), PluginType.SOURCE, result.getMsg()));
+ }
+ this.metadata.setTopics(
+ Arrays.asList(
+ config.getString(ConsumerConfig.TOPICS.key())
+ .split(DEFAULT_FIELD_DELIMITER)));
+
+ RocketMqBaseConfiguration.Builder baseConfigBuilder =
+ RocketMqBaseConfiguration.newBuilder()
+ .consumer()
+ .namesrvAddr(config.getString(NAME_SRV_ADDR.key()));
+ boolean aclEnabled = ACL_ENABLED.defaultValue();
+ if (config.hasPath(ACL_ENABLED.key())) {
+ aclEnabled = config.getBoolean(ACL_ENABLED.key());
+ if (aclEnabled
+ && (!config.hasPath(ACCESS_KEY.key()) || !config.hasPath(SECRET_KEY.key()))) {
+ throw new RocketMqConnectorException(
+ SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
+ "When ACL_ENABLED "
+ + "true , ACCESS_KEY and SECRET_KEY must be configured");
+ }
+ if (config.hasPath(ACCESS_KEY.key())) {
+ baseConfigBuilder.accessKey(config.getString(ACCESS_KEY.key()));
+ }
+ if (config.hasPath(SECRET_KEY.key())) {
+ baseConfigBuilder.secretKey(config.getString(SECRET_KEY.key()));
+ }
+ }
+ baseConfigBuilder.aclEnable(aclEnabled);
+ // config consumer group
+ if (config.hasPath(ConsumerConfig.CONSUMER_GROUP.key())) {
+ baseConfigBuilder.groupId(config.getString(ConsumerConfig.CONSUMER_GROUP.key()));
+ } else {
+ baseConfigBuilder.groupId(DEFAULT_CONSUMER_GROUP);
+ }
+ if (config.hasPath(ConsumerConfig.BATCH_SIZE.key())) {
+ baseConfigBuilder.batchSize(config.getInt(ConsumerConfig.BATCH_SIZE.key()));
+ } else {
+ baseConfigBuilder.batchSize(ConsumerConfig.BATCH_SIZE.defaultValue());
+ }
+ if (config.hasPath(ConsumerConfig.POLL_TIMEOUT_MILLIS.key())) {
+ baseConfigBuilder.pollTimeoutMillis(
+ config.getInt(ConsumerConfig.POLL_TIMEOUT_MILLIS.key()));
+ } else {
+ baseConfigBuilder.pollTimeoutMillis(ConsumerConfig.POLL_TIMEOUT_MILLIS.defaultValue());
+ }
+ this.metadata.setBaseConfig(baseConfigBuilder.build());
+
+ // auto commit
+ if (config.hasPath(ConsumerConfig.COMMIT_ON_CHECKPOINT.key())) {
+ this.metadata.setEnabledCommitCheckpoint(
+ config.getBoolean(ConsumerConfig.COMMIT_ON_CHECKPOINT.key()));
+ } else {
+ this.metadata.setEnabledCommitCheckpoint(
+ ConsumerConfig.COMMIT_ON_CHECKPOINT.defaultValue());
+ }
+
+ StartMode startMode = ConsumerConfig.START_MODE.defaultValue();
+ if (config.hasPath(ConsumerConfig.START_MODE.key())) {
+ startMode =
+ StartMode.valueOf(
+ config.getString(ConsumerConfig.START_MODE.key()).toUpperCase());
+ switch (startMode) {
+ case CONSUME_FROM_TIMESTAMP:
+ long startOffsetsTimestamp =
+ config.getLong(ConsumerConfig.START_MODE_TIMESTAMP.key());
+ long currentTimestamp = System.currentTimeMillis();
+ if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > currentTimestamp) {
+ throw new IllegalArgumentException(
+ "The offsets timestamp value is smaller than 0 or smaller"
+ + " than the current time");
+ }
+ this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
+ break;
+ case CONSUME_FROM_SPECIFIC_OFFSETS:
+ Config offsets = config.getConfig(ConsumerConfig.START_MODE_OFFSETS.key());
+ ConfigRenderOptions options = concise();
+ String offsetsJson = offsets.root().render(options);
+ if (offsetsJson == null) {
+ throw new IllegalArgumentException(
+ "start mode is "
+ + StartMode.CONSUME_FROM_SPECIFIC_OFFSETS
+ + "but no specific"
+ + " offsets were specified.");
+ }
+ Map<MessageQueue, Long> specificStartOffsets = new HashMap<>();
+ ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
+ jsonNodes
+ .fieldNames()
+ .forEachRemaining(
+ key -> {
+ int splitIndex = key.lastIndexOf("-");
+ String topic = key.substring(0, splitIndex);
+ String partition = key.substring(splitIndex + 1);
+ long offset = jsonNodes.get(key).asLong();
+ MessageQueue messageQueue =
+ new MessageQueue(
+ topic, null, Integer.valueOf(partition));
+ specificStartOffsets.put(messageQueue, offset);
+ });
+ this.metadata.setSpecificStartOffsets(specificStartOffsets);
+ break;
+ default:
+ break;
+ }
+ }
+ this.metadata.setStartMode(startMode);
+
+ if (config.hasPath(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
+ this.discoveryIntervalMillis =
+ config.getLong(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
+ }
+
+ // set deserialization
+ setDeserialization(config);
+ }
+
+ @Override
+ public void setJobContext(JobContext jobContext) {
+ this.jobContext = jobContext;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return this.typeInfo;
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(
+ SourceReader.Context readerContext) throws Exception {
+ return new RocketMqSourceReader(this.metadata, deserializationSchema, readerContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> createEnumerator(
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context) throws Exception {
+ return new RocketMqSourceSplitEnumerator(this.metadata, context, discoveryIntervalMillis);
+ }
+
+ @Override
+ public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> restoreEnumerator(
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context,
+ RocketMqSourceState sourceState)
+ throws Exception {
+ return new RocketMqSourceSplitEnumerator(this.metadata, context, discoveryIntervalMillis);
+ }
+
+ private void setDeserialization(Config config) {
+ if (config.hasPath(ConsumerConfig.SCHEMA.key())) {
+ typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+ SchemaFormat format = SchemaFormat.JSON;
+ if (config.hasPath(FORMAT.key())) {
+ format = SchemaFormat.find(config.getString(FORMAT.key()));
+ }
+ switch (format) {
+ case JSON:
+ deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+ break;
+ case TEXT:
+ String delimiter = DEFAULT_FIELD_DELIMITER;
+ if (config.hasPath(FIELD_DELIMITER.key())) {
+ delimiter = config.getString(FIELD_DELIMITER.key());
+ }
+ deserializationSchema =
+ TextDeserializationSchema.builder()
+ .seaTunnelRowType(typeInfo)
+ .delimiter(delimiter)
+ .build();
+ break;
+ default:
+ throw new SeaTunnelJsonFormatException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+ }
+ } else {
+ typeInfo = CatalogTableUtil.buildSimpleTextSchema();
+ this.deserializationSchema =
+ TextDeserializationSchema.builder()
+ .seaTunnelRowType(typeInfo)
+ .delimiter(String.valueOf('\002'))
+ .build();
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
new file mode 100644
index 000000000..e1f756f0b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ConsumerConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RocketMqSourceFactory implements TableSourceFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "Rocketmq";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(ConsumerConfig.TOPICS, Config.NAME_SRV_ADDR)
+ .optional(
+ Config.FORMAT,
+ ConsumerConfig.START_MODE,
+ ConsumerConfig.CONSUMER_GROUP,
+ ConsumerConfig.COMMIT_ON_CHECKPOINT,
+ ConsumerConfig.SCHEMA,
+ ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+ ConsumerConfig.POLL_TIMEOUT_MILLIS,
+ ConsumerConfig.BATCH_SIZE)
+ .conditional(
+ ConsumerConfig.START_MODE,
+ StartMode.CONSUME_FROM_TIMESTAMP,
+ ConsumerConfig.START_MODE_TIMESTAMP)
+ .conditional(
+ ConsumerConfig.START_MODE,
+ StartMode.CONSUME_FROM_SPECIFIC_OFFSETS,
+ ConsumerConfig.START_MODE_OFFSETS)
+ .build();
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return RocketMqSource.class;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
new file mode 100644
index 000000000..c44307d16
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class RocketMqSourceReader implements SourceReader<SeaTunnelRow, RocketMqSourceSplit> {
+
+ private static final long THREAD_WAIT_TIME = 500L;
+
+ private final SourceReader.Context context;
+ private final ConsumerMetadata metadata;
+ private final Set<RocketMqSourceSplit> sourceSplits;
+ private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
+ private final Map<MessageQueue, RocketMqConsumerThread> consumerThreads;
+ private final ExecutorService executorService;
+ private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+ private final LinkedBlockingQueue<RocketMqSourceSplit> pendingPartitionsQueue;
+
+ private volatile boolean running = false;
+
+ public RocketMqSourceReader(
+ ConsumerMetadata metadata,
+ DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ SourceReader.Context context) {
+ this.metadata = metadata;
+ this.context = context;
+ this.sourceSplits = new HashSet<>();
+ this.deserializationSchema = deserializationSchema;
+ this.consumerThreads = new ConcurrentHashMap<>();
+ this.checkpointOffsets = new ConcurrentHashMap<>();
+ this.executorService =
+ Executors.newCachedThreadPool(r -> new Thread(r, "RocketMq Source Data Consumer"));
+ pendingPartitionsQueue = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public void open() throws Exception {
+ // No-op
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ if (!running) {
+ Thread.sleep(THREAD_WAIT_TIME);
+ return;
+ }
+ while (pendingPartitionsQueue.size() != 0) {
+ sourceSplits.add(pendingPartitionsQueue.poll());
+ }
+ sourceSplits.forEach(
+ sourceSplit ->
+ consumerThreads.computeIfAbsent(
+ sourceSplit.getMessageQueue(),
+ s -> {
+ RocketMqConsumerThread thread =
+ new RocketMqConsumerThread(metadata);
+ executorService.submit(thread);
+ return thread;
+ }));
+ sourceSplits.forEach(
+ sourceSplit -> {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ try {
+ consumerThreads
+ .get(sourceSplit.getMessageQueue())
+ .getTasks()
+ .put(
+ consumer -> {
+ try {
+ Set<MessageQueue> messageQueues =
+ Sets.newHashSet(
+ sourceSplit.getMessageQueue());
+ consumer.assign(messageQueues);
+ if (sourceSplit.getStartOffset() >= 0) {
+ consumer.seek(
+ sourceSplit.getMessageQueue(),
+ sourceSplit.getStartOffset());
+ }
+ List<MessageExt> records =
+ consumer.poll(
+ metadata.getBaseConfig()
+ .getPollTimeoutMillis());
+ if (records.isEmpty()) {
+ log.warn(
+ "Rocketmq consumer can not pull data, split {}, start offset {}, end offset {}",
+ sourceSplit.getMessageQueue(),
+ sourceSplit.getStartOffset(),
+ sourceSplit.getEndOffset());
+ }
+ Map<MessageQueue, List<MessageExt>> groupRecords =
+ records.stream()
+ .collect(
+ Collectors.groupingBy(
+ record ->
+ new MessageQueue(
+ record
+ .getTopic(),
+ record
+ .getBrokerName(),
+ record
+ .getQueueId())));
+ for (MessageQueue messageQueue : messageQueues) {
+ if (!groupRecords.containsKey(messageQueue)) {
+ continue;
+ }
+ List<MessageExt> messages =
+ groupRecords.get(messageQueue);
+ for (MessageExt record : messages) {
+ deserializationSchema.deserialize(
+ record.getBody(), output);
+ if (Boundedness.BOUNDED.equals(
+ context.getBoundedness())
+ && record.getQueueOffset()
+ >= sourceSplit
+ .getEndOffset()) {
+ break;
+ }
+ }
+ long lastOffset = -1;
+ if (!messages.isEmpty()) {
+ lastOffset =
+ messages.get(messages.size() - 1)
+ .getQueueOffset();
+ sourceSplit.setStartOffset(lastOffset);
+ }
+
+ if (lastOffset >= sourceSplit.getEndOffset()) {
+ sourceSplit.setEndOffset(lastOffset);
+ }
+ }
+ } catch (Exception e) {
+ completableFuture.completeExceptionally(e);
+ }
+ completableFuture.complete(null);
+ });
+ } catch (InterruptedException e) {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.CONSUME_DATA_FAILED, e);
+ }
+ completableFuture.join();
+ });
+
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ context.signalNoMoreElement();
+ }
+ }
+
+ @Override
+ public List<RocketMqSourceSplit> snapshotState(long checkpointId) throws Exception {
+ List<RocketMqSourceSplit> pendingSplit =
+ sourceSplits.stream().map(RocketMqSourceSplit::copy).collect(Collectors.toList());
+ Map<MessageQueue, Long> offsets =
+ checkpointOffsets.computeIfAbsent(checkpointId, id -> Maps.newConcurrentMap());
+ for (RocketMqSourceSplit split : pendingSplit) {
+ offsets.put(split.getMessageQueue(), split.getStartOffset());
+ }
+ return pendingSplit;
+ }
+
+ @Override
+ public void addSplits(List<RocketMqSourceSplit> splits) {
+ running = true;
+ splits.forEach(
+ s -> {
+ try {
+ pendingPartitionsQueue.put(s);
+ } catch (InterruptedException e) {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
+ }
+ });
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ // No-op
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ if (!checkpointOffsets.containsKey(checkpointId)) {
+ log.warn("checkpoint {} do not exist or have already been committed.", checkpointId);
+ } else {
+ Map<MessageQueue, Long> messageQueueOffset = checkpointOffsets.remove(checkpointId);
+ for (Map.Entry<MessageQueue, Long> entry : messageQueueOffset.entrySet()) {
+ MessageQueue messageQueue = entry.getKey();
+ Long offset = entry.getValue();
+ try {
+ if (messageQueue != null && offset != null) {
+ consumerThreads
+ .get(messageQueue)
+ .getTasks()
+ .put(
+ consumer -> {
+ if (this.metadata.isEnabledCommitCheckpoint()) {
+ consumer.getOffsetStore()
+ .updateOffset(messageQueue, offset, false);
+ consumer.getOffsetStore().persist(messageQueue);
+ }
+ });
+ }
+ } catch (InterruptedException e) {
+ log.error("commit offset failed", e);
+ }
+ }
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplit.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplit.java
new file mode 100644
index 000000000..90fa136c5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplit.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/** define rocketmq source split */
+public class RocketMqSourceSplit implements SourceSplit {
+ private MessageQueue messageQueue;
+ private long startOffset = -1L;
+ private long endOffset = -1L;
+
+ public RocketMqSourceSplit() {}
+
+ public RocketMqSourceSplit(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ public RocketMqSourceSplit(MessageQueue messageQueue, long startOffset, long endOffset) {
+ this.messageQueue = messageQueue;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public long getEndOffset() {
+ return endOffset;
+ }
+
+ public void setEndOffset(long endOffset) {
+ this.endOffset = endOffset;
+ }
+
+ @Override
+ public String splitId() {
+ return this.messageQueue.getTopic()
+ + "-"
+ + this.messageQueue.getBrokerName()
+ + "-"
+ + this.messageQueue.getQueueId();
+ }
+
+ public RocketMqSourceSplit copy() {
+ return new RocketMqSourceSplit(
+ this.messageQueue, this.getStartOffset(), this.getEndOffset());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
new file mode 100644
index 000000000..9a4912cd9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class RocketMqSourceSplitEnumerator
+ implements SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> {
+
+ private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60 * 1000;
+ private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
+ private final ConsumerMetadata metadata;
+ private final Context<RocketMqSourceSplit> context;
+ private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
+ private ScheduledExecutorService executor;
+ private ScheduledFuture scheduledFuture;
+ // ms
+ private long discoveryIntervalMillis;
+
+ public RocketMqSourceSplitEnumerator(
+ ConsumerMetadata metadata, SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+ this.metadata = metadata;
+ this.context = context;
+ this.assignedSplit = new HashMap<>();
+ this.pendingSplit = new HashMap<>();
+ }
+
+ public RocketMqSourceSplitEnumerator(
+ ConsumerMetadata metadata,
+ SourceSplitEnumerator.Context<RocketMqSourceSplit> context,
+ long discoveryIntervalMillis) {
+ this(metadata, context);
+ this.discoveryIntervalMillis = discoveryIntervalMillis;
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private static int getSplitOwner(MessageQueue messageQueue, int numReaders) {
+ int startIndex = ((messageQueue.getQueueId() * 31) & 0x7FFFFFFF) % numReaders;
+ return (startIndex + messageQueue.getQueueId()) % numReaders;
+ }
+
+ @Override
+ public void open() {
+ discoveryIntervalMillis =
+ discoveryIntervalMillis > 0
+ ? discoveryIntervalMillis
+ : DEFAULT_DISCOVERY_INTERVAL_MILLIS;
+ if (discoveryIntervalMillis > 0) {
+ this.executor =
+ Executors.newScheduledThreadPool(
+ 1,
+ runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName("RocketMq-messageQueue-dynamic-discovery");
+ return thread;
+ });
+ this.scheduledFuture =
+ executor.scheduleWithFixedDelay(
+ () -> {
+ try {
+ discoverySplits();
+ } catch (Exception e) {
+ log.error("Dynamic discovery failure:", e);
+ }
+ },
+ discoveryIntervalMillis,
+ discoveryIntervalMillis,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ fetchPendingPartitionSplit();
+ setPartitionStartOffset();
+ assignSplit();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ @Override
+ public void addSplitsBack(List<RocketMqSourceSplit> splits, int subtaskId) {
+ if (!splits.isEmpty()) {
+ pendingSplit.putAll(convertToNextSplit(splits));
+ assignSplit();
+ }
+ }
+
+ private Map<MessageQueue, ? extends RocketMqSourceSplit> convertToNextSplit(
+ List<RocketMqSourceSplit> splits) {
+ try {
+ Map<MessageQueue, Long> listOffsets =
+ listOffsets(
+ splits.stream()
+ .map(RocketMqSourceSplit::getMessageQueue)
+ .collect(Collectors.toList()),
+ ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ splits.forEach(
+ split -> {
+ split.setStartOffset(split.getEndOffset() + 1);
+ split.setEndOffset(listOffsets.get(split.getMessageQueue()));
+ });
+ return splits.stream()
+ .collect(Collectors.toMap(split -> split.getMessageQueue(), split -> split));
+ } catch (Exception e) {
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
+ }
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return pendingSplit.size();
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {
+ // No-op
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ if (!pendingSplit.isEmpty()) {
+ assignSplit();
+ }
+ }
+
+ @Override
+ public RocketMqSourceState snapshotState(long checkpointId) throws Exception {
+ return new RocketMqSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ // No-op
+ }
+
+ private void discoverySplits() {
+ fetchPendingPartitionSplit();
+ assignSplit();
+ }
+
+ private void fetchPendingPartitionSplit() {
+ getTopicInfo()
+ .forEach(
+ split -> {
+ if (!assignedSplit.containsKey(split.getMessageQueue())) {
+ if (!pendingSplit.containsKey(split.getMessageQueue())) {
+ pendingSplit.put(split.getMessageQueue(), split);
+ }
+ }
+ });
+ }
+
+ private Set<RocketMqSourceSplit> getTopicInfo() {
+ log.info("Configured topics: {}", metadata.getTopics());
+ List<Map<MessageQueue, TopicOffset>> offsetTopics =
+ RocketMqAdminUtil.offsetTopics(metadata.getBaseConfig(), metadata.getTopics());
+ Set<RocketMqSourceSplit> sourceSplits = Sets.newConcurrentHashSet();
+ offsetTopics.forEach(
+ messageQueueOffsets -> {
+ messageQueueOffsets.forEach(
+ (messageQueue, topicOffset) -> {
+ sourceSplits.add(
+ new RocketMqSourceSplit(
+ messageQueue,
+ topicOffset.getMinOffset(),
+ topicOffset.getMaxOffset()));
+ });
+ });
+ return sourceSplits;
+ }
+
+ private void setPartitionStartOffset() throws MQClientException {
+ Collection<MessageQueue> topicPartitions = pendingSplit.keySet();
+ Map<MessageQueue, Long> topicPartitionOffsets = null;
+ switch (metadata.getStartMode()) {
+ case CONSUME_FROM_FIRST_OFFSET:
+ topicPartitionOffsets =
+ listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ break;
+ case CONSUME_FROM_LAST_OFFSET:
+ topicPartitionOffsets =
+ listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ break;
+ case CONSUME_FROM_TIMESTAMP:
+ topicPartitionOffsets =
+ listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
+ break;
+ case CONSUME_FROM_GROUP_OFFSETS:
+ topicPartitionOffsets = listConsumerGroupOffsets(topicPartitions);
+ if (topicPartitionOffsets.isEmpty()) {
+ topicPartitionOffsets =
+ listOffsets(
+ topicPartitions, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ }
+ break;
+ case CONSUME_FROM_SPECIFIC_OFFSETS:
+ topicPartitionOffsets = metadata.getSpecificStartOffsets();
+ // Fill in broker name
+ setMessageQueueBroker(topicPartitions, topicPartitionOffsets);
+ break;
+ default:
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
+ metadata.getStartMode().name());
+ }
+ topicPartitionOffsets
+ .entrySet()
+ .forEach(
+ entry -> {
+ if (pendingSplit.containsKey(entry.getKey())) {
+ pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
+ }
+ });
+ }
+
+ private void setMessageQueueBroker(
+ Collection<MessageQueue> topicPartitions,
+ Map<MessageQueue, Long> topicPartitionOffsets) {
+ Map<String, String> flatTopicPartitions =
+ topicPartitions.stream()
+ .collect(
+ Collectors.toMap(
+ messageQueue ->
+ messageQueue.getTopic()
+ + "-"
+ + messageQueue.getQueueId(),
+ MessageQueue::getBrokerName));
+ for (MessageQueue messageQueue : topicPartitionOffsets.keySet()) {
+ String key = messageQueue.getTopic() + "-" + messageQueue.getQueueId();
+ if (flatTopicPartitions.containsKey(key)) {
+ messageQueue.setBrokerName(flatTopicPartitions.get(key));
+ }
+ }
+ }
+
+ private Map<MessageQueue, Long> listOffsets(
+ Collection<MessageQueue> messageQueues, ConsumeFromWhere consumeFromWhere) {
+ Map<MessageQueue, Long> results = Maps.newConcurrentMap();
+ Map<MessageQueue, TopicOffset> messageQueueOffsets =
+ RocketMqAdminUtil.flatOffsetTopics(metadata.getBaseConfig(), metadata.getTopics());
+ switch (consumeFromWhere) {
+ case CONSUME_FROM_FIRST_OFFSET:
+ messageQueues.forEach(
+ messageQueue -> {
+ TopicOffset topicOffset = messageQueueOffsets.get(messageQueue);
+ results.put(messageQueue, topicOffset.getMinOffset());
+ });
+ break;
+ case CONSUME_FROM_LAST_OFFSET:
+ messageQueues.forEach(
+ messageQueue -> {
+ TopicOffset topicOffset = messageQueueOffsets.get(messageQueue);
+ results.put(messageQueue, topicOffset.getMaxOffset());
+ });
+ break;
+ case CONSUME_FROM_TIMESTAMP:
+ results.putAll(
+ RocketMqAdminUtil.searchOffsetsByTimestamp(
+ metadata.getBaseConfig(),
+ messageQueues,
+ metadata.getStartOffsetsTimestamp()));
+ break;
+ default:
+ // No-op
+ break;
+ }
+ return results;
+ }
+
+ /** list consumer group offsets */
+ public Map<MessageQueue, Long> listConsumerGroupOffsets(
+ Collection<MessageQueue> messageQueues) {
+ return RocketMqAdminUtil.currentOffsets(
+ metadata.getBaseConfig(), metadata.getTopics(), new HashSet<>(messageQueues));
+ }
+
+ private synchronized void assignSplit() {
+ Map<Integer, List<RocketMqSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
+ for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
+ readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+ }
+ pendingSplit
+ .entrySet()
+ .forEach(
+ s -> {
+ if (!assignedSplit.containsKey(s.getKey())) {
+ readySplit
+ .get(
+ getSplitOwner(
+ s.getKey(), context.currentParallelism()))
+ .add(s.getValue());
+ }
+ });
+ readySplit.forEach(context::assignSplit);
+ assignedSplit.putAll(pendingSplit);
+ pendingSplit.clear();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceState.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceState.java
new file mode 100644
index 000000000..a3d923757
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class RocketMqSourceState implements Serializable {
+
+ private Set<RocketMqSourceSplit> assignSplits;
+
+ public RocketMqSourceState(Set<RocketMqSourceSplit> assignSplits) {
+ this.assignSplits = assignSplits;
+ }
+
+ public Set<RocketMqSourceSplit> getAssignSplits() {
+ return assignSplits;
+ }
+
+ public void setAssignSplits(Set<RocketMqSourceSplit> assignSplits) {
+ this.assignSplits = assignSplits;
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 34e4e850c..e9c7268d9 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -70,6 +70,7 @@
<module>connector-tdengine</module>
<module>connector-selectdb-cloud</module>
<module>connector-hbase</module>
+ <module>connector-rocketmq</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 72a4011cc..e9fc30f84 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -480,6 +480,13 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-rocketmq</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- jdbc driver -->
<dependency>
<groupId>com.aliyun.phoenix</groupId>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/pom.xml
similarity index 51%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/pom.xml
index a0e839d77..285df10dd 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/pom.xml
@@ -18,78 +18,46 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
-
- <artifactId>seatunnel-connector-v2-e2e</artifactId>
- <packaging>pom</packaging>
- <name>SeaTunnel : E2E : Connector V2 :</name>
-
- <modules>
- <module>connector-assert-e2e</module>
- <module>connector-jdbc-e2e</module>
- <module>connector-redis-e2e</module>
- <module>connector-cdc-sqlserver-e2e</module>
- <module>connector-clickhouse-e2e</module>
- <module>connector-starrocks-e2e</module>
- <module>connector-influxdb-e2e</module>
- <module>connector-amazondynamodb-e2e</module>
- <module>connector-file-local-e2e</module>
- <module>connector-cassandra-e2e</module>
- <module>connector-neo4j-e2e</module>
- <module>connector-http-e2e</module>
- <module>connector-rabbitmq-e2e</module>
- <module>connector-kafka-e2e</module>
- <module>connector-doris-e2e</module>
- <module>connector-fake-e2e</module>
- <module>connector-elasticsearch-e2e</module>
- <module>connector-iotdb-e2e</module>
- <module>connector-cdc-mysql-e2e</module>
- <module>connector-iceberg-e2e</module>
- <module>connector-iceberg-hadoop3-e2e</module>
- <module>connector-tdengine-e2e</module>
- <module>connector-datahub-e2e</module>
- <module>connector-mongodb-e2e</module>
- <module>connector-hbase-e2e</module>
- <module>connector-maxcompute-e2e</module>
- </modules>
+ <artifactId>connector-rocketmq-e2e</artifactId>
<dependencies>
+ <!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-e2e-common</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-13-starter</artifactId>
+ <artifactId>connector-rocketmq</artifactId>
<version>${project.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-15-starter</artifactId>
+ <artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-2-starter</artifactId>
+ <artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-3-starter</artifactId>
+ <artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-starter</artifactId>
+ <artifactId>seatunnel-transforms-v2</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
new file mode 100644
index 000000000..94c50ae05
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.rocketmq;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+/** rocketmq container */
+public class RocketMqContainer extends GenericContainer<RocketMqContainer> {
+
+ public static final int NAMESRV_PORT = 9876;
+ public static final int BROKER_PORT = 10911;
+ public static final String BROKER_NAME = "broker-a";
+ private static final int DEFAULT_BROKER_PERMISSION = 6;
+
+ public RocketMqContainer(DockerImageName image) {
+ super(image);
+ withExposedPorts(NAMESRV_PORT, BROKER_PORT, BROKER_PORT - 2);
+ }
+
+ @Override
+ protected void configure() {
+ String command = "#!/bin/bash\n";
+ command += "./mqnamesrv &\n";
+ command += "./mqbroker -n localhost:" + NAMESRV_PORT;
+ withCommand("sh", "-c", command);
+ }
+
+ @Override
+ @SneakyThrows
+ protected void containerIsStarted(InspectContainerResponse containerInfo) {
+ List<String> updateBrokerConfigCommands = new ArrayList<>();
+ updateBrokerConfigCommands.add(updateBrokerConfig("autoCreateTopicEnable", true));
+ updateBrokerConfigCommands.add(updateBrokerConfig("brokerName", BROKER_NAME));
+ updateBrokerConfigCommands.add(updateBrokerConfig("brokerIP1", getLinuxLocalIp()));
+ updateBrokerConfigCommands.add(
+ updateBrokerConfig("listenPort", getMappedPort(BROKER_PORT)));
+ updateBrokerConfigCommands.add(
+ updateBrokerConfig("brokerPermission", DEFAULT_BROKER_PERMISSION));
+ final String command = String.join(" && ", updateBrokerConfigCommands);
+ ExecResult result = execInContainer("/bin/sh", "-c", command);
+ if (result != null && result.getExitCode() != 0) {
+ throw new IllegalStateException(result.toString());
+ }
+ }
+
+ private String updateBrokerConfig(final String key, final Object val) {
+ final String brokerAddr = "localhost:" + BROKER_PORT;
+ return "./mqadmin updateBrokerConfig -b " + brokerAddr + " -k " + key + " -v " + val;
+ }
+
+ public String getNameSrvAddr() {
+ return String.format("%s:%s", getHost(), getMappedPort(NAMESRV_PORT));
+ }
+
+ public String getLinuxLocalIp() {
+ String ip = "";
+ try {
+ Enumeration<NetworkInterface> networkInterfaces =
+ NetworkInterface.getNetworkInterfaces();
+ while (networkInterfaces.hasMoreElements()) {
+ NetworkInterface networkInterface = networkInterfaces.nextElement();
+ Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+ while (inetAddresses.hasMoreElements()) {
+ InetAddress inetAddress = inetAddresses.nextElement();
+ if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) {
+ ip = inetAddress.getHostAddress();
+ }
+ }
+ }
+ } catch (SocketException ex) {
+ ex.printStackTrace();
+ }
+ return ip;
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
new file mode 100644
index 000000000..fed95a41d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.rocketmq;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.engine.common.Constant;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT;
+
+@Slf4j
+public class RocketMqIT extends TestSuiteBase implements TestResource {
+
+ private static final String IMAGE = "apache/rocketmq:4.9.4";
+ private static final String ROCKETMQ_GROUP = "SeaTunnel-rocketmq-group";
+ private static final String HOST = "rocketmq-e2e";
+ private static final SchemaFormat DEFAULT_FORMAT = SchemaFormat.JSON;
+ private static final String DEFAULT_FIELD_DELIMITER = ",";
+ private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+ new SeaTunnelRowType(
+ new String[] {
+ "id",
+ "c_map",
+ "c_array",
+ "c_string",
+ "c_boolean",
+ "c_tinyint",
+ "c_smallint",
+ "c_int",
+ "c_bigint",
+ "c_float",
+ "c_double",
+ "c_decimal",
+ "c_bytes",
+ "c_date",
+ "c_timestamp"
+ },
+ new SeaTunnelDataType[] {
+ BasicType.LONG_TYPE,
+ new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+ ArrayType.BYTE_ARRAY_TYPE,
+ BasicType.STRING_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ new DecimalType(2, 1),
+ PrimitiveByteArrayType.INSTANCE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_DATE_TIME_TYPE
+ });
+ private RocketMqContainer rocketMqContainer;
+ private DefaultMQProducer producer;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ this.rocketMqContainer =
+ new RocketMqContainer(DockerImageName.parse(IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HOST)
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+ .waitingFor(
+ new HostPortWaitStrategy()
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ rocketMqContainer.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", NAMESRV_PORT, NAMESRV_PORT)));
+ rocketMqContainer.start();
+ log.info("RocketMq container started");
+ initProducer();
+ log.info("Write 100 records to topic test_topic_source");
+ DefaultSeaTunnelRowSerializer serializer =
+ new DefaultSeaTunnelRowSerializer(
+ "test_topic_source",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), "test_topic_source", 0, 100);
+ }
+
+ @SneakyThrows
+ private void initProducer() {
+ this.producer = new DefaultMQProducer();
+ this.producer.setNamesrvAddr(rocketMqContainer.getNameSrvAddr());
+ this.producer.setInstanceName(UUID.randomUUID().toString());
+ this.producer.setProducerGroup(ROCKETMQ_GROUP);
+ this.producer.setLanguage(LanguageCode.JAVA);
+ this.producer.setSendMsgTimeout(15000);
+ this.producer.start();
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (this.producer != null) {
+ this.producer.shutdown();
+ }
+ if (this.rocketMqContainer != null) {
+ this.rocketMqContainer.close();
+ }
+ }
+
+ @TestTemplate
+ public void testSinkRocketMq(TestContainer container) throws IOException, InterruptedException {
+
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq-sink_fake_to_rocketmq.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+ String topicName = "test_topic";
+ Map<String, String> data = getRocketMqConsumerData(topicName);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String key = data.keySet().iterator().next();
+ ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
+ Assertions.assertTrue(objectNode.has("c_map"));
+ Assertions.assertTrue(objectNode.has("c_string"));
+ Assertions.assertEquals(10, data.size());
+ }
+
+ @TestTemplate
+ public void testTextFormatSinkRocketMq(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq-text-sink_fake_to_rocketmq.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ String topicName = "test_text_topic";
+ Map<String, String> data = getRocketMqConsumerData(topicName);
+ Assertions.assertEquals(10, data.size());
+ }
+
+ @TestTemplate
+ public void testSourceRocketMqTextToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ new DefaultSeaTunnelRowSerializer(
+ "test_topic_text",
+ SEATUNNEL_ROW_TYPE,
+ SchemaFormat.TEXT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), "test_topic_text", 0, 100);
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq-source_text_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testSourceRocketMqJsonToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ new DefaultSeaTunnelRowSerializer(
+ "test_topic_json",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), "test_topic_json", 0, 100);
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq-source_json_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testRocketMqLatestToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq/rocketmq_source_latest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testRocketMqEarliestToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq/rocketmq_source_earliest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testRocketMqSpecificOffsetsToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq/rocketmq_source_specific_offsets_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testRocketMqTimestampToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq/rocketmq_source_timestamp_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testSourceRocketMqStartConfig(TestContainer container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ new DefaultSeaTunnelRowSerializer(
+ "test_topic_group",
+ SEATUNNEL_ROW_TYPE,
+ DEFAULT_FORMAT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), "test_topic_group", 100, 150);
+ testRocketMqGroupOffsetsToConsole(container);
+ }
+
+ public void testRocketMqGroupOffsetsToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/rocketmq/rocketmq_source_group_offset_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @SneakyThrows
+ @SuppressWarnings("checkstyle:Indentation")
+ private void generateTestData(
+ ProducerRecordConverter converter, String topic, int start, int end) {
+ for (int i = start; i < end; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ Long.valueOf(i),
+ Collections.singletonMap("key", Short.parseShort("1")),
+ new Byte[] {Byte.parseByte("1")},
+ "string",
+ Boolean.FALSE,
+ Byte.parseByte("1"),
+ Short.parseShort("1"),
+ Integer.parseInt("1"),
+ Long.parseLong("1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ BigDecimal.valueOf(11, 1),
+ "test".getBytes(),
+ LocalDate.now(),
+ LocalDateTime.now()
+ });
+ Message message = converter.convert(row);
+ producer.send(message, new MessageQueue(topic, RocketMqContainer.BROKER_NAME, 0));
+ }
+ }
+
+ @SneakyThrows
+ private Map<String, String> getRocketMqConsumerData(String topicName) {
+ Map<String, String> data = new HashMap<>();
+ DefaultLitePullConsumer consumer =
+ RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
+ consumer.start();
+ // assign
+ Map<MessageQueue, TopicOffset> queueOffsets =
+ RetryUtils.retryWithException(
+ () -> {
+ return RocketMqAdminUtil.offsetTopics(
+ newConfiguration(), Lists.newArrayList(topicName))
+ .get(0);
+ },
+ new RetryUtils.RetryMaterial(
+ Constant.OPERATION_RETRY_TIME,
+ false,
+ exception -> exception instanceof RocketMqConnectorException,
+ Constant.OPERATION_RETRY_SLEEP));
+ consumer.assign(queueOffsets.keySet());
+ // seek to offset
+ Map<MessageQueue, Long> currentOffsets =
+ RocketMqAdminUtil.currentOffsets(
+ newConfiguration(), Lists.newArrayList(topicName), queueOffsets.keySet());
+ for (MessageQueue mq : queueOffsets.keySet()) {
+ long currentOffset =
+ currentOffsets.containsKey(mq)
+ ? currentOffsets.get(mq)
+ : queueOffsets.get(mq).getMinOffset();
+ consumer.seek(mq, currentOffset);
+ }
+ while (true) {
+ List<MessageExt> messages = consumer.poll(5000);
+ if (messages.isEmpty()) {
+ break;
+ }
+ for (MessageExt message : messages) {
+ data.put(message.getKeys(), new String(message.getBody(), StandardCharsets.UTF_8));
+ }
+ consumer.commitSync();
+ }
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+ log.info("Consumer {} data total {}", topicName, data.size());
+ return data;
+ }
+
+ public RocketMqBaseConfiguration newConfiguration() {
+ return RocketMqBaseConfiguration.newBuilder()
+ .groupId(ROCKETMQ_GROUP)
+ .aclEnable(false)
+ .namesrvAddr(rocketMqContainer.getNameSrvAddr())
+ .build();
+ }
+
+ interface ProducerRecordConverter {
+ Message convert(SeaTunnelRow row);
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/log4j2-test.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..e0bfc5a7b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/log4j2-test.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
+
+
+logger.testcontainers.name=org.testcontainers
+logger.testcontainers.level=INFO
+
+logger.dockerjava.name=com.github.dockerjava
+logger.dockerjava.level=INFO
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-sink_fake_to_rocketmq.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-sink_fake_to_rocketmq.conf
new file mode 100644
index 000000000..ca21a11ce
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-sink_fake_to_rocketmq.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, smallint>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topic = "test_topic"
+ partition.key.fields = ["c_map","c_string"]
+ producer.send.sync = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
new file mode 100644
index 000000000..40f4f1f2d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_json"
+ result_table_name = "rocketmq_table"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
new file mode 100644
index 000000000..d04cda5b4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_text"
+ result_table_name = "rocketmq_table"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ format = text
+ # The default field delimiter is ","
+ field_delimiter = ","
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-text-sink_fake_to_rocketmq.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-text-sink_fake_to_rocketmq.conf
new file mode 100644
index 000000000..935936dc3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-text-sink_fake_to_rocketmq.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ row.num = 10
+ map.size = 10
+ array.size = 10
+ bytes.length = 10
+ string.length = 10
+ schema = {
+ fields {
+ c_map = "map<string, smallint>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_null = "null"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topic = "test_text_topic"
+ format = text
+ partition.key.fields = ["c_map","c_string"]
+ producer.send.sync = true
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
new file mode 100644
index 000000000..07e3ad2bf
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ job.mode = "BATCH"
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_source"
+ result_table_name = "rocketmq_table"
+ format = json
+ start.mode = "CONSUME_FROM_FIRST_OFFSET"
+ schema = {
+ fields {
+ id = bigint
+ }
+ }
+ }
+}
+
+transform {
+ }
+
+sink {
+ Console {}
+ Assert {
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
new file mode 100644
index 000000000..ce881c0e9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_group"
+ result_table_name = "rocketmq_table"
+ format = json
+ start.mode = "CONSUME_FROM_GROUP_OFFSETS"
+ schema = {
+ fields {
+ id = bigint
+ }
+ }
+ }
+}
+
+transform {
+ }
+
+sink {
+ Console {}
+ Assert {
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+
+ {
+ rule_type = MIN
+ rule_value = 100
+ },
+ {
+ rule_type = MAX
+ rule_value = 149
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
new file mode 100644
index 000000000..65eae9194
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_source"
+ result_table_name = "rocketmq_table"
+ format = json
+ start.mode = "CONSUME_FROM_LAST_OFFSET"
+ schema = {
+ fields {
+ id = bigint
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 99
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
new file mode 100644
index 000000000..7b33f49c4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_source"
+ result_table_name = "rocketmq_table"
+ # The default format is json, which is optional
+ format = json
+ start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
+ schema = {
+ fields {
+ id = bigint
+ }
+ }
+
+ start.mode.offsets = {
+ test_topic_source-0 = 50
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules = {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = MIN
+ rule_value = 50
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
new file mode 100644
index 000000000..2f55a8005
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_source"
+ result_table_name = "rocketmq_table"
+ # The default format is json, which is optional
+ format = json
+ start.mode = "CONSUME_FROM_TIMESTAMP"
+ schema = {
+ fields {
+ id = bigint
+ }
+ }
+ start.mode.timestamp = 1667179890315
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {}
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index a0e839d77..bce102766 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -53,6 +53,7 @@
<module>connector-mongodb-e2e</module>
<module>connector-hbase-e2e</module>
<module>connector-maxcompute-e2e</module>
+ <module>connector-rocketmq-e2e</module>
</modules>
<dependencies>