You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2023/04/14 03:13:43 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add rocketmq source and sink (#4007)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e33389755 [Feature][Connector-V2] Add rocketmq source and sink (#4007)
e33389755 is described below

commit e333897552e4fd03bda1688c26251d1d674fc318
Author: Xiaojian Sun <su...@163.com>
AuthorDate: Fri Apr 14 11:13:35 2023 +0800

    [Feature][Connector-V2] Add rocketmq source and sink (#4007)
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |  16 +
 docs/en/connector-v2/sink/RocketMQ.md              |  82 +++++
 docs/en/connector-v2/source/RocketMQ.md            | 142 ++++++++
 plugin-mapping.properties                          |   3 +-
 release-note.md                                    |   1 +
 seatunnel-connectors-v2/connector-rocketmq/pom.xml |  61 ++++
 .../rocketmq/common/RocketMqAdminUtil.java         | 319 ++++++++++++++++++
 .../rocketmq/common/RocketMqBaseConfiguration.java | 255 ++++++++++++++
 .../seatunnel/rocketmq/common/SchemaFormat.java    |  48 +++
 .../seatunnel/rocketmq/common/StartMode.java       |  27 ++
 .../seatunnel/rocketmq/config/Config.java          |  68 ++++
 .../seatunnel/rocketmq/config/ConsumerConfig.java  |  89 +++++
 .../seatunnel/rocketmq/config/ProducerConfig.java  |  71 ++++
 .../exception/RocketMqConnectorErrorCode.java      |  71 ++++
 .../exception/RocketMqConnectorException.java      |  36 ++
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 127 +++++++
 .../rocketmq/serialize/SeaTunnelRowSerializer.java |  33 ++
 .../seatunnel/rocketmq/sink/ProducerMetadata.java  |  47 +++
 .../rocketmq/sink/RocketMqNoTransactionSender.java | 102 ++++++
 .../rocketmq/sink/RocketMqProducerSender.java      |  26 ++
 .../seatunnel/rocketmq/sink/RocketMqSink.java      | 170 ++++++++++
 .../rocketmq/sink/RocketMqSinkFactory.java         |  48 +++
 .../rocketmq/sink/RocketMqSinkWriter.java          | 102 ++++++
 .../rocketmq/sink/RocketMqTransactionSender.java   |  80 +++++
 .../rocketmq/source/ConsumerMetadata.java          |  40 +++
 .../rocketmq/source/RocketMqConsumerThread.java    |  69 ++++
 .../seatunnel/rocketmq/source/RocketMqSource.java  | 288 ++++++++++++++++
 .../rocketmq/source/RocketMqSourceFactory.java     |  66 ++++
 .../rocketmq/source/RocketMqSourceReader.java      | 259 ++++++++++++++
 .../rocketmq/source/RocketMqSourceSplit.java       |  79 +++++
 .../source/RocketMqSourceSplitEnumerator.java      | 352 +++++++++++++++++++
 .../rocketmq/source/RocketMqSourceState.java       |  38 +++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   7 +
 .../{ => connector-rocketmq-e2e}/pom.xml           |  60 +---
 .../e2e/connector/rocketmq/RocketMqContainer.java  | 102 ++++++
 .../e2e/connector/rocketmq/RocketMqIT.java         | 372 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  36 ++
 .../resources/rocketmq-sink_fake_to_rocketmq.conf  |  69 ++++
 .../resources/rocketmq-source_json_to_console.conf |  91 +++++
 .../resources/rocketmq-source_text_to_console.conf |  91 +++++
 .../rocketmq-text-sink_fake_to_rocketmq.conf       |  70 ++++
 .../rocketmq_source_earliest_to_console.conf       |  69 ++++
 .../rocketmq_source_group_offset_to_console.conf   |  69 ++++
 .../rocketmq_source_latest_to_console.conf         |  68 ++++
 ...ocketmq_source_specific_offsets_to_console.conf |  73 ++++
 .../rocketmq_source_timestamp_to_console.conf      |  72 ++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 48 files changed, 4419 insertions(+), 47 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 29bb1adbf..7c137264b 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -252,3 +252,19 @@ problems encountered by users.
 |---------------------------|------------------------|-------------------------|
 | FILTER_FIELD_TRANSFORM-01 | filter field not found | filter field not found. |
 
+## RocketMq Connector Error Codes
+
+|    code     |                                           description                                           |                                                           solution                                                            |
+|-------------|-------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
+| ROCKETMQ-01 | Add a split back to the split enumerator failed, it will only happen when a SourceReader failed | When users encounter this error code, it means that add a split back to the split enumerator failed, please check it.         |
+| ROCKETMQ-02 | Add the split checkpoint state to reader failed                                                 | When users encounter this error code, it means that add the split checkpoint state to reader failed, please check it.         |
+| ROCKETMQ-03 | Rocketmq failed to consume data                                                                 | When users encounter this error code, it means that rocketmq failed to consume data, please check it., please check it.       |
+| ROCKETMQ-04 | Error occurred when the rocketmq consumer thread was running                                    | When the user encounters this error code, it means that an error occurred while running the Rocketmq consumer thread          |
+| ROCKETMQ-05 | Rocketmq producer failed to send message                                                        | When users encounter this error code, it means that Rocketmq producer failed to send message, please check it.                |
+| ROCKETMQ-06 | Rocketmq producer failed to start                                                               | When users encounter this error code, it means that Rocketmq producer failed to start, please check it.                       |
+| ROCKETMQ-07 | Rocketmq consumer failed to start                                                               | When users encounter this error code, it means that Rocketmq consumer failed to start, please check it.                       |
+| ROCKETMQ-08 | Unsupported start mode                                                                          | When users encounter this error code, it means that the configured start mode is not supported, please check it.              |
+| ROCKETMQ-09 | Failed to get the offsets of the current consumer group                                         | When users encounter this error code, it means that failed to get the offsets of the current consumer group, please check it. |
+| ROCKETMQ-10 | Failed to search offset through timestamp                                                       | When users encounter this error code, it means that failed to search offset through timestamp, please check it.               |
+| ROCKETMQ-11 | Failed to get topic min and max topic                                                           | When users encounter this error code, it means that failed to get topic min and max topic, please check it.                   |
+
diff --git a/docs/en/connector-v2/sink/RocketMQ.md b/docs/en/connector-v2/sink/RocketMQ.md
new file mode 100644
index 000000000..703192021
--- /dev/null
+++ b/docs/en/connector-v2/sink/RocketMQ.md
@@ -0,0 +1,82 @@
+# RocketMQ
+
+> RocketMQ sink connector
+>
+  ## Description
+
+Write Rows to a Apache RocketMQ topic.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we will use 2pc to guarantee the message is sent to RocketMQ exactly once.
+
+## Options
+
+|         name         |  type   | required |      default value       |
+|----------------------|---------|----------|--------------------------|
+| topic                | string  | yes      | -                        |
+| name.srv.addr        | string  | yes      | -                        |
+| acl.enabled          | Boolean | no       | false                    |
+| access.key           | String  | no       |                          |
+| secret.key           | String  | no       |                          |
+| producer.group       | String  | no       | SeaTunnel-producer-Group |
+| semantic             | string  | no       | NON                      |
+| partition.key.fields | array   | no       | -                        |
+| format               | String  | no       | json                     |
+| field.delimiter      | String  | no       | ,                        |
+| common-options       | config  | no       | -                        |
+
+### topic [string]
+
+`RocketMQ topic` name.
+
+### name.srv.addr [string]
+
+`RocketMQ` name server cluster address.
+
+### semantic [string]
+
+Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+
+### partition.key.fields [array]
+
+Configure which fields are used as the key of the RocketMQ message.
+
+For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.
+
+Upstream data is the following:
+
+| name | age |     data      |
+|------|-----|---------------|
+| Jack | 16  | data-example1 |
+| Mary | 23  | data-example2 |
+
+If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.
+
+### format
+
+Data format. The default format is json. Optional text format. The default field separator is ",".
+If you customize the delimiter, add the "field_delimiter" option.
+
+### field_delimiter
+
+Customize the field delimiter for data format.
+
+### common options [config]
+
+Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+sink {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    topic = "test-topic-003"
+    partition.key.fields = ["name"]
+  }
+}
+```
+
diff --git a/docs/en/connector-v2/source/RocketMQ.md b/docs/en/connector-v2/source/RocketMQ.md
new file mode 100644
index 000000000..fd209ce70
--- /dev/null
+++ b/docs/en/connector-v2/source/RocketMQ.md
@@ -0,0 +1,142 @@
+# RocketMQ
+
+> RocketMQ source connector
+
+## Description
+
+Source connector for Apache RocketMQ.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [x] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+|                name                 |  type   | required |       default value        |
+|-------------------------------------|---------|----------|----------------------------|
+| topics                              | String  | yes      | -                          |
+| name.srv.addr                       | String  | yes      | -                          |
+| acl.enabled                         | Boolean | no       | false                      |
+| access.key                          | String  | no       |                            |
+| secret.key                          | String  | no       |                            |
+| batch.size                          | int     | no       | 100                        |
+| consumer.group                      | String  | no       | SeaTunnel-Consumer-Group   |
+| commit.on.checkpoint                | Boolean | no       | true                       |
+| schema                              |         | no       | -                          |
+| format                              | String  | no       | json                       |
+| field.delimiter                     | String  | no       | ,                          |
+| start.mode                          | String  | no       | CONSUME_FROM_GROUP_OFFSETS |
+| start.mode.offsets                  |         | no       |                            |
+| start.mode.timestamp                | Long    | no       |                            |
+| partition.discovery.interval.millis | long    | no       | -1                         |
+| common-options                      | config  | no       | -                          |
+
+### topics [string]
+
+`RocketMQ topic` name. If there are multiple `topics`, use `,` to split, for example: `"tpc1,tpc2"`.
+
+### name.srv.addr [string]
+
+`RocketMQ` name server cluster address.
+
+### consumer.group [string]
+
+`RocketMQ consumer group id`, used to distinguish different consumer groups.
+
+### acl.enabled [boolean]
+
+If true, access control is enabled, and access key and secret key need to be configured.
+
+### access.key [string]
+
+When ACL_ENABLED is true, access key cannot be empty.
+
+### secret.key [string]
+
+When ACL_ENABLED is true, secret key cannot be empty.
+
+### batch.size [int]
+
+`RocketMQ` consumer pull batch size
+
+### commit.on.checkpoint [boolean]
+
+If true the consumer's offset will be periodically committed in the background.
+
+## partition.discovery.interval.millis [long]
+
+The interval for dynamically discovering topics and partitions.
+
+### schema
+
+The structure of the data, including field names and field types.
+
+## format
+
+Data format. The default format is json. Optional text format. The default field separator is ", ".
+If you customize the delimiter, add the "field.delimiter" option.
+
+## field.delimiter
+
+Customize the field delimiter for data format.
+
+## start.mode
+
+The initial consumption pattern of consumers,there are several types:
+[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP]
+,[CONSUME_FROM_SPECIFIC_OFFSETS]
+
+## start.mode.timestamp
+
+The time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".
+
+## start.mode.offsets
+
+The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".
+
+for example:
+
+```hocon
+start.mode.offsets = {
+         topic1-0 = 70
+         topic1-1 = 10
+         topic1-2 = 10
+      }
+```
+
+### common-options [config]
+
+Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+
+## Example
+
+### Simple
+
+```hocon
+source {
+  Rocketmq {
+    name.srv.addr = "localhost:9876"
+    topics = "test-topic-002"
+    consumer.group = "consumer-group"
+    parallelism = 2
+    batch.size = 20
+    schema = {
+       fields {
+            age = int
+            name = string
+       }
+     }
+    start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
+    start.mode.offsets = {
+                test-topic-002-0 = 20
+             }
+            
+  }
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index d72508717..87debbe7e 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -105,4 +105,5 @@ seatunnel.source.Persistiq = connector-http-persistiq
 seatunnel.sink.SelectDBCloud = connector-selectdb-cloud
 seatunnel.sink.Hbase = connector-hbase
 seatunnel.source.StarRocks = connector-starrocks
-
+seatunnel.source.Rocketmq = connector-rocketmq
+seatunnel.sink.Rocketmq = connector-rocketmq
diff --git a/release-note.md b/release-note.md
index 65ae1abab..5554c24d8 100644
--- a/release-note.md
+++ b/release-note.md
@@ -19,6 +19,7 @@
 - [Hbase] Add hbase sink connector #4049
 - [Github] Add Github source connector #4155
 - [CDC] Support export debezium-json format to kafka #4339
+- [RocketMQ] Add RocketMQ source and sink connector #4007
 ### Formats
 - [Canal]Support read canal format message #3950
 
diff --git a/seatunnel-connectors-v2/connector-rocketmq/pom.xml b/seatunnel-connectors-v2/connector-rocketmq/pom.xml
new file mode 100644
index 000000000..5076eaaad
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <artifactId>connector-rocketmq</artifactId>
+
+    <properties>
+        <rocketmq.version>4.9.4</rocketmq.version>
+    </properties>
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
new file mode 100644
index 000000000..ee831257a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqAdminUtil.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/** Tools for creating RocketMq topic and group. */
+public class RocketMqAdminUtil {
+
+    public static String createUniqInstance(String prefix) {
+        return prefix.concat("-").concat(UUID.randomUUID().toString());
+    }
+
+    public static RPCHook getAclRpcHook(String accessKey, String secretKey) {
+        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
+    }
+
+    /** Init default lite pull consumer */
+    public static DefaultLitePullConsumer initDefaultLitePullConsumer(
+            RocketMqBaseConfiguration config, boolean autoCommit) {
+        DefaultLitePullConsumer consumer = null;
+        if (Objects.isNull(consumer)) {
+            if (StringUtils.isBlank(config.getAccessKey())
+                    && StringUtils.isBlank(config.getSecretKey())) {
+                consumer = new DefaultLitePullConsumer(config.getGroupId());
+            } else {
+                consumer =
+                        new DefaultLitePullConsumer(
+                                config.getGroupId(),
+                                getAclRpcHook(config.getAccessKey(), config.getSecretKey()));
+            }
+        }
+        consumer.setNamesrvAddr(config.getNamesrvAddr());
+        String uniqueName = createUniqInstance(config.getNamesrvAddr());
+        consumer.setInstanceName(uniqueName);
+        consumer.setUnitName(uniqueName);
+        consumer.setAutoCommit(autoCommit);
+        if (config.getBatchSize() != null) {
+            consumer.setPullBatchSize(config.getBatchSize());
+        }
+        return consumer;
+    }
+
+    /** Init transaction producer */
+    public static TransactionMQProducer initTransactionMqProducer(
+            RocketMqBaseConfiguration config, TransactionListener listener) {
+        RPCHook rpcHook = null;
+        if (config.isAclEnable()) {
+            rpcHook =
+                    new AclClientRPCHook(
+                            new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
+        }
+        TransactionMQProducer producer = new TransactionMQProducer(config.getGroupId(), rpcHook);
+        producer.setNamesrvAddr(config.getNamesrvAddr());
+        producer.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+        producer.setLanguage(LanguageCode.JAVA);
+        producer.setTransactionListener(listener);
+        if (config.getMaxMessageSize() != null) {
+            producer.setMaxMessageSize(config.getMaxMessageSize());
+        }
+        if (config.getSendMsgTimeout() != null) {
+            producer.setSendMsgTimeout(config.getSendMsgTimeout());
+        }
+
+        return producer;
+    }
+
+    public static DefaultMQProducer initDefaultMqProducer(RocketMqBaseConfiguration config) {
+        RPCHook rpcHook = null;
+        if (config.isAclEnable()) {
+            rpcHook =
+                    new AclClientRPCHook(
+                            new SessionCredentials(config.getAccessKey(), config.getSecretKey()));
+        }
+        DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+        producer.setNamesrvAddr(config.getNamesrvAddr());
+        producer.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+        producer.setProducerGroup(config.getGroupId());
+        producer.setLanguage(LanguageCode.JAVA);
+        if (config.getMaxMessageSize() != null && config.getMaxMessageSize() > 0) {
+            producer.setMaxMessageSize(config.getMaxMessageSize());
+        }
+        if (config.getSendMsgTimeout() != null && config.getMaxMessageSize() > 0) {
+            producer.setSendMsgTimeout(config.getSendMsgTimeout());
+        }
+        return producer;
+    }
+
+    private static DefaultMQAdminExt startMQAdminTool(RocketMqBaseConfiguration config)
+            throws MQClientException {
+        DefaultMQAdminExt admin;
+        if (config.isAclEnable()) {
+            admin =
+                    new DefaultMQAdminExt(
+                            new AclClientRPCHook(
+                                    new SessionCredentials(
+                                            config.getAccessKey(), config.getSecretKey())));
+        } else {
+            admin = new DefaultMQAdminExt();
+        }
+        admin.setNamesrvAddr(config.getNamesrvAddr());
+        admin.setAdminExtGroup(config.getGroupId());
+        admin.setInstanceName(createUniqInstance(config.getNamesrvAddr()));
+        admin.start();
+        return admin;
+    }
+
+    /** Create rocketMq topic */
+    public static void createTopic(RocketMqBaseConfiguration config, TopicConfig topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(config);
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+            Set<String> clusterNameSet = clusterAddrTable.keySet();
+            for (String clusterName : clusterNameSet) {
+                Set<String> masterSet =
+                        CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+                }
+            }
+        } catch (Exception e) {
+            throw new RocketMqConnectorException(RocketMqConnectorErrorCode.CREATE_TOPIC_ERROR, e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+    }
+
+    /** check topic exist */
+    public static boolean topicExist(RocketMqBaseConfiguration config, String topic) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        boolean foundTopicRouteInfo = false;
+        try {
+            defaultMQAdminExt = startMQAdminTool(config);
+            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+            if (topicRouteData != null) {
+                foundTopicRouteInfo = true;
+            }
+        } catch (Exception e) {
+            if (e instanceof MQClientException) {
+                if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+                    foundTopicRouteInfo = false;
+                } else {
+                    throw new RocketMqConnectorException(
+                            RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
+                }
+            } else {
+                throw new RocketMqConnectorException(
+                        RocketMqConnectorErrorCode.TOPIC_NOT_EXIST_ERROR, e);
+            }
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+        return foundTopicRouteInfo;
+    }
+
+    /** Get topic offsets */
+    public static List<Map<MessageQueue, TopicOffset>> offsetTopics(
+            RocketMqBaseConfiguration config, List<String> topics) {
+        List<Map<MessageQueue, TopicOffset>> offsets = Lists.newArrayList();
+        DefaultMQAdminExt adminClient = null;
+        try {
+            adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+            for (String topic : topics) {
+                TopicStatsTable topicStatsTable = adminClient.examineTopicStats(topic);
+                offsets.add(topicStatsTable.getOffsetTable());
+            }
+            return offsets;
+        } catch (MQClientException
+                | MQBrokerException
+                | RemotingException
+                | InterruptedException e) {
+            throw new RocketMqConnectorException(
+                    RocketMqConnectorErrorCode.GET_MIN_AND_MAX_OFFSETS_ERROR, e);
+        } finally {
+            if (adminClient != null) {
+                adminClient.shutdown();
+            }
+        }
+    }
+
+    /** Flat topics offsets */
+    public static Map<MessageQueue, TopicOffset> flatOffsetTopics(
+            RocketMqBaseConfiguration config, List<String> topics) {
+        Map<MessageQueue, TopicOffset> messageQueueTopicOffsets = Maps.newConcurrentMap();
+        offsetTopics(config, topics)
+                .forEach(
+                        offsetTopic -> {
+                            messageQueueTopicOffsets.putAll(offsetTopic);
+                        });
+        return messageQueueTopicOffsets;
+    }
+
+    /** Search offsets by timestamp */
+    public static Map<MessageQueue, Long> searchOffsetsByTimestamp(
+            RocketMqBaseConfiguration config,
+            Collection<MessageQueue> messageQueues,
+            Long timestamp) {
+        Map<MessageQueue, Long> offsets = Maps.newConcurrentMap();
+        DefaultMQAdminExt adminClient = null;
+        try {
+            adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+            for (MessageQueue messageQueue : messageQueues) {
+                long offset = adminClient.searchOffset(messageQueue, timestamp);
+                offsets.put(messageQueue, offset);
+            }
+            return offsets;
+        } catch (MQClientException e) {
+            throw new RocketMqConnectorException(
+                    RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_TIMESTAMP_ERROR, e);
+        } finally {
+            if (adminClient != null) {
+                adminClient.shutdown();
+            }
+        }
+    }
+
+    /** Get consumer group offset */
+    public static Map<MessageQueue, Long> currentOffsets(
+            RocketMqBaseConfiguration config,
+            List<String> topics,
+            Set<MessageQueue> messageQueues) {
+        // Get consumer group offset
+        DefaultMQAdminExt adminClient = null;
+        try {
+            adminClient = RocketMqAdminUtil.startMQAdminTool(config);
+            Map<MessageQueue, OffsetWrapper> consumerOffsets = Maps.newConcurrentMap();
+            for (String topic : topics) {
+                ConsumeStats consumeStats =
+                        adminClient.examineConsumeStats(config.getGroupId(), topic);
+                consumerOffsets.putAll(consumeStats.getOffsetTable());
+            }
+            return consumerOffsets.keySet().stream()
+                    .filter(messageQueue -> messageQueues.contains(messageQueue))
+                    .collect(
+                            Collectors.toMap(
+                                    messageQueue -> messageQueue,
+                                    messageQueue ->
+                                            consumerOffsets.get(messageQueue).getConsumerOffset()));
+        } catch (MQClientException
+                | MQBrokerException
+                | RemotingException
+                | InterruptedException e) {
+            if (e instanceof MQClientException) {
+                if (((MQClientException) e).getResponseCode() == ResponseCode.TOPIC_NOT_EXIST) {
+                    return Collections.emptyMap();
+                } else {
+                    throw new RocketMqConnectorException(
+                            RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_ERROR, e);
+                }
+            } else {
+                throw new RocketMqConnectorException(
+                        RocketMqConnectorErrorCode.GET_CONSUMER_GROUP_OFFSETS_ERROR, e);
+            }
+        } finally {
+            if (adminClient != null) {
+                adminClient.shutdown();
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqBaseConfiguration.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqBaseConfiguration.java
new file mode 100644
index 000000000..eba809d65
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/RocketMqBaseConfiguration.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Configuration for connecting RocketMq */
+@Setter
+@Getter
+public class RocketMqBaseConfiguration implements Serializable {
+    private String namesrvAddr;
+    private String groupId;
+    /** set acl config */
+    private boolean aclEnable;
+
+    private String accessKey;
+    private String secretKey;
+
+    // consumer
+    private Integer batchSize;
+    private Long pollTimeoutMillis;
+
+    // producer
+    private Integer maxMessageSize;
+    private Integer sendMsgTimeout;
+
+    private RocketMqBaseConfiguration(
+            String groupId,
+            String namesrvAddr,
+            boolean aclEnable,
+            String accessKey,
+            String secretKey) {
+        this.groupId = groupId;
+        this.namesrvAddr = namesrvAddr;
+        this.aclEnable = aclEnable;
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
+    }
+
+    private RocketMqBaseConfiguration(
+            String groupId,
+            String namesrvAddr,
+            boolean aclEnable,
+            String accessKey,
+            String secretKey,
+            int pullBatchSize,
+            Long consumerPullTimeoutMillis) {
+        this(groupId, namesrvAddr, aclEnable, accessKey, secretKey);
+        this.batchSize = pullBatchSize;
+        this.pollTimeoutMillis = consumerPullTimeoutMillis;
+    }
+
+    private RocketMqBaseConfiguration(
+            String groupId,
+            String namesrvAddr,
+            boolean aclEnable,
+            String accessKey,
+            String secretKey,
+            int maxMessageSize,
+            int sendMsgTimeout) {
+
+        this(groupId, namesrvAddr, aclEnable, accessKey, secretKey);
+        this.maxMessageSize = maxMessageSize;
+        this.sendMsgTimeout = sendMsgTimeout;
+    }
+
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RocketMqBaseConfiguration that = (RocketMqBaseConfiguration) o;
+        return aclEnable == that.aclEnable
+                && batchSize == that.batchSize
+                && pollTimeoutMillis == that.pollTimeoutMillis
+                && maxMessageSize == that.maxMessageSize
+                && sendMsgTimeout == that.sendMsgTimeout
+                && Objects.equals(namesrvAddr, that.namesrvAddr)
+                && Objects.equals(groupId, that.groupId)
+                && Objects.equals(accessKey, that.accessKey)
+                && Objects.equals(secretKey, that.secretKey);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                namesrvAddr,
+                groupId,
+                aclEnable,
+                accessKey,
+                secretKey,
+                batchSize,
+                pollTimeoutMillis,
+                maxMessageSize,
+                sendMsgTimeout);
+    }
+
+    @Override
+    public String toString() {
+        return "RocketMqBaseConfiguration{"
+                + "namesrvAddr='"
+                + namesrvAddr
+                + '\''
+                + ", groupId='"
+                + groupId
+                + '\''
+                + ", aclEnable="
+                + aclEnable
+                + ", accessKey='"
+                + accessKey
+                + '\''
+                + ", secretKey='"
+                + secretKey
+                + '\''
+                + ", pullBatchSize="
+                + batchSize
+                + ", pollTimeoutMillis="
+                + pollTimeoutMillis
+                + ", maxMessageSize="
+                + maxMessageSize
+                + ", sendMsgTimeout="
+                + sendMsgTimeout
+                + '}';
+    }
+
+    enum ConfigType {
+        NONE,
+        CONSUMER,
+        PRODUCER
+    }
+
+    public static class Builder {
+        private String namesrvAddr;
+        private String groupId;
+        private boolean aclEnable;
+        private String accessKey;
+        private String secretKey;
+        // consumer
+        private Integer batchSize;
+        private Long pollTimeoutMillis;
+
+        // producer
+        private Integer maxMessageSize;
+        private Integer sendMsgTimeout;
+
+        private ConfigType configType = ConfigType.NONE;
+
+        public Builder consumer() {
+            this.configType = ConfigType.CONSUMER;
+            return this;
+        }
+
+        public Builder producer() {
+            this.configType = ConfigType.PRODUCER;
+            return this;
+        }
+
+        public Builder namesrvAddr(String namesrvAddr) {
+            this.namesrvAddr = namesrvAddr;
+            return this;
+        }
+
+        public Builder groupId(String groupId) {
+            this.groupId = groupId;
+            return this;
+        }
+
+        public Builder aclEnable(boolean aclEnable) {
+            this.aclEnable = aclEnable;
+            return this;
+        }
+
+        public Builder accessKey(String accessKey) {
+            this.accessKey = accessKey;
+            return this;
+        }
+
+        public Builder secretKey(String secretKey) {
+            this.secretKey = secretKey;
+            return this;
+        }
+
+        public Builder batchSize(int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public Builder pollTimeoutMillis(long consumerPullTimeoutMillis) {
+            this.pollTimeoutMillis = consumerPullTimeoutMillis;
+            return this;
+        }
+
+        public Builder maxMessageSize(int maxMessageSize) {
+            this.maxMessageSize = maxMessageSize;
+            return this;
+        }
+
+        public Builder sendMsgTimeout(int sendMsgTimeout) {
+            this.sendMsgTimeout = sendMsgTimeout;
+            return this;
+        }
+
+        public RocketMqBaseConfiguration build() {
+            switch (configType) {
+                case CONSUMER:
+                    return new RocketMqBaseConfiguration(
+                            groupId,
+                            namesrvAddr,
+                            aclEnable,
+                            accessKey,
+                            secretKey,
+                            batchSize,
+                            pollTimeoutMillis);
+                case PRODUCER:
+                    return new RocketMqBaseConfiguration(
+                            groupId,
+                            namesrvAddr,
+                            aclEnable,
+                            accessKey,
+                            secretKey,
+                            maxMessageSize,
+                            sendMsgTimeout);
+                default:
+                    return new RocketMqBaseConfiguration(
+                            groupId, namesrvAddr, aclEnable, accessKey, secretKey);
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/SchemaFormat.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/SchemaFormat.java
new file mode 100644
index 000000000..c583d7245
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/SchemaFormat.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+
+/** schema format type */
+public enum SchemaFormat {
+    JSON("json"),
+    TEXT("text");
+
+    private final String name;
+
+    SchemaFormat(String name) {
+        this.name = name;
+    }
+
+    /** find format */
+    public static SchemaFormat find(String name) {
+        for (SchemaFormat format : values()) {
+            if (format.getName().equals(name)) {
+                return format;
+            }
+        }
+        throw new SeaTunnelJsonFormatException(
+                CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + name);
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/StartMode.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/StartMode.java
new file mode 100644
index 000000000..a80be29ef
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/common/StartMode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.common;
+
+/** Consumer start mode */
+public enum StartMode {
+    CONSUME_FROM_LAST_OFFSET,
+    CONSUME_FROM_FIRST_OFFSET,
+    CONSUME_FROM_GROUP_OFFSETS,
+    CONSUME_FROM_TIMESTAMP,
+    CONSUME_FROM_SPECIFIC_OFFSETS,
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
new file mode 100644
index 000000000..eced97260
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/Config.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+
+public class Config {
+
+    /** The default field delimiter is “,” */
+    public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+    public static final Option<String> NAME_SRV_ADDR =
+            Options.key("name.srv.addr")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("RocketMq name server configuration center address.");
+
+    public static final Option<Boolean> ACL_ENABLED =
+            Options.key("acl.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, access control is enabled, and access key and secret key need to be "
+                                    + "configured.");
+
+    public static final Option<String> ACCESS_KEY =
+            Options.key("access.key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("When ACL_ENABLED is true, access key cannot be empty.");
+
+    public static final Option<String> SECRET_KEY =
+            Options.key("secret.key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("When ACL_ENABLED is true, secret key cannot be empty.");
+
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .defaultValue(SchemaFormat.JSON.getName())
+                    .withDescription(
+                            "Data format. The default format is json. Optional text format. The default field separator is \", \". "
+                                    + "If you customize the delimiter, add the \"field.delimiter\" option.");
+
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field.delimiter")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Customize the field delimiter for data format.");
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
new file mode 100644
index 000000000..534b4c8bd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ConsumerConfig.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+/** Consumer config */
+public class ConsumerConfig extends Config {
+
+    public static final Option<String> TOPICS =
+            Options.key("topics")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "RocketMq topic name. If there are multiple topics, use , to split, for example: "
+                                    + "\"tpc1,tpc2\".");
+    public static final Option<String> CONSUMER_GROUP =
+            Options.key("consumer.group")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("RocketMq consumer group id.");
+    public static final Option<Boolean> COMMIT_ON_CHECKPOINT =
+            Options.key("commit.on.checkpoint")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "If true, the consumer's offset will be stored in the background periodically.");
+    public static final Option<Config> SCHEMA =
+            Options.key("schema")
+                    .objectType(Config.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "The structure of the data, including field names and field types.");
+    public static final Option<StartMode> START_MODE =
+            Options.key("start.mode")
+                    .objectType(StartMode.class)
+                    .defaultValue(StartMode.CONSUME_FROM_GROUP_OFFSETS)
+                    .withDescription(
+                            "The initial consumption pattern of consumers,there are several types:\n"
+                                    + "[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]");
+    public static final Option<Long> START_MODE_TIMESTAMP =
+            Options.key("start.mode.timestamp")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("The time required for consumption mode to be timestamp.");
+    public static final Option<Config> START_MODE_OFFSETS =
+            Options.key("start.mode.offsets")
+                    .objectType(Config.class)
+                    .noDefaultValue()
+                    .withDescription(
+                            "The offset required for consumption mode to be specific offsets.");
+    /** Configuration key to define the consumer's partition discovery interval, in milliseconds. */
+    public static final Option<Long> KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS =
+            Options.key("partition.discovery" + ".interval.millis")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            "The interval for dynamically discovering topics and partitions.");
+
+    private static final int DEFAULT_BATCH_SIZE = 100;
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch.size")
+                    .intType()
+                    .defaultValue(DEFAULT_BATCH_SIZE)
+                    .withDescription("Rocketmq consumer pull batch size.");
+    private static final long DEFAULT_POLL_TIMEOUT_MILLIS = 5000;
+    public static final Option<Long> POLL_TIMEOUT_MILLIS =
+            Options.key("consumer.poll.timeout.millis")
+                    .longType()
+                    .defaultValue(DEFAULT_POLL_TIMEOUT_MILLIS)
+                    .withDescription("The poll timeout in milliseconds.");
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ProducerConfig.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ProducerConfig.java
new file mode 100644
index 000000000..134a9908f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/config/ProducerConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import java.util.List;
+
+public class ProducerConfig extends Config {
+
+    public static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 * 4;
+    public static final int DEFAULT_SEND_MESSAGE_TIMEOUT_MILLIS = 3000;
+    public static final Option<String> TOPIC =
+            Options.key("topic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("RocketMq topic name. ");
+
+    public static final Option<String> PRODUCER_GROUP =
+            Options.key("producer.group")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("RocketMq producer group id.");
+
+    public static final Option<List<String>> PARTITION_KEY_FIELDS =
+            Options.key("partition.key.fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Configure which fields are used as the key of the RocketMq message.");
+
+    public static final Option<Boolean> EXACTLY_ONCE =
+            Options.key("exactly.once")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("If true, the transaction message will be sent.");
+
+    public static final Option<Boolean> SEND_SYNC =
+            Options.key("producer.send.sync")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("If true, the message will be sync sent.");
+
+    public static final Option<Integer> MAX_MESSAGE_SIZE =
+            Options.key("max.message.size")
+                    .intType()
+                    .defaultValue(DEFAULT_MAX_MESSAGE_SIZE)
+                    .withDescription("Maximum allowed message body size in bytes.");
+
+    public static final Option<Integer> SEND_MESSAGE_TIMEOUT_MILLIS =
+            Options.key("send.message.timeout")
+                    .intType()
+                    .defaultValue(DEFAULT_SEND_MESSAGE_TIMEOUT_MILLIS)
+                    .withDescription("Timeout for sending messages.");
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorErrorCode.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorErrorCode.java
new file mode 100644
index 000000000..618ef7b7f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorErrorCode.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum RocketMqConnectorErrorCode implements SeaTunnelErrorCode {
+    ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED(
+            "ROCKETMQ-01",
+            "Add a split back to the split enumerator failed, it will only happen when a SourceReader failed"),
+    ADD_SPLIT_CHECKPOINT_FAILED("ROCKETMQ-02", "Add the split checkpoint state to reader failed"),
+    CONSUME_DATA_FAILED("ROCKETMQ-03", "Rocketmq failed to consume data"),
+    CONSUME_THREAD_RUN_ERROR(
+            "ROCKETMQ-04", "Error occurred when the rocketmq consumer thread was running"),
+    PRODUCER_SEND_MESSAGE_ERROR("ROCKETMQ-05", "Rocketmq producer failed to send message"),
+    PRODUCER_START_ERROR("ROCKETMQ-06", "Rocketmq producer failed to start"),
+    CONSUMER_START_ERROR("ROCKETMQ-07", "Rocketmq consumer failed to start"),
+
+    UNSUPPORTED_START_MODE_ERROR("ROCKETMQ-08", "Unsupported start mode"),
+
+    GET_CONSUMER_GROUP_OFFSETS_ERROR(
+            "ROCKETMQ-09", "Failed to get the offsets of the current consumer group"),
+
+    GET_CONSUMER_GROUP_OFFSETS_TIMESTAMP_ERROR(
+            "ROCKETMQ-10", "Failed to search offset through timestamp"),
+
+    GET_MIN_AND_MAX_OFFSETS_ERROR("ROCKETMQ-11", "Failed to get topic min and max topic"),
+
+    TOPIC_NOT_EXIST_ERROR("ROCKETMQ-12", "Check the topic for errors"),
+
+    CREATE_TOPIC_ERROR("ROCKETMQ-13", "Failed to create topic"),
+    ;
+
+    private final String code;
+    private final String description;
+
+    RocketMqConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+
+    @Override
+    public String getErrorMessage() {
+        return SeaTunnelErrorCode.super.getErrorMessage();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorException.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorException.java
new file mode 100644
index 000000000..f71c63900
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/exception/RocketMqConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class RocketMqConnectorException extends SeaTunnelRuntimeException {
+    public RocketMqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public RocketMqConnectorException(
+            SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public RocketMqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/DefaultSeaTunnelRowSerializer.java
new file mode 100644
index 000000000..67b120633
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/DefaultSeaTunnelRowSerializer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.apache.rocketmq.common.message.Message;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.function.Function;
+
+@Slf4j
+public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
+    private final String topic;
+    private final SerializationSchema keySerialization;
+    private final SerializationSchema valueSerialization;
+
+    public DefaultSeaTunnelRowSerializer(
+            String topic,
+            SeaTunnelRowType seaTunnelRowType,
+            SchemaFormat format,
+            String delimiter) {
+        this(
+                topic,
+                element -> null,
+                createSerializationSchema(seaTunnelRowType, format, delimiter));
+    }
+
+    public DefaultSeaTunnelRowSerializer(
+            String topic,
+            List<String> keyFieldNames,
+            SeaTunnelRowType seaTunnelRowType,
+            SchemaFormat format,
+            String delimiter) {
+        this(
+                topic,
+                createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
+                createSerializationSchema(seaTunnelRowType, format, delimiter));
+    }
+
+    public DefaultSeaTunnelRowSerializer(
+            String topic,
+            SerializationSchema keySerialization,
+            SerializationSchema valueSerialization) {
+        this.topic = topic;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+    }
+
+    private static SerializationSchema createSerializationSchema(
+            SeaTunnelRowType rowType, SchemaFormat format, String delimiter) {
+        switch (format) {
+            case TEXT:
+                return TextSerializationSchema.builder()
+                        .seaTunnelRowType(rowType)
+                        .delimiter(delimiter)
+                        .build();
+            case JSON:
+                return new JsonSerializationSchema(rowType);
+            default:
+                throw new SeaTunnelJsonFormatException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+        }
+    }
+
+    private static SerializationSchema createKeySerializationSchema(
+            List<String> keyFieldNames, SeaTunnelRowType seaTunnelRowType) {
+        if (keyFieldNames == null || keyFieldNames.isEmpty()) {
+            return element -> null;
+        }
+        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
+        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
+        for (int i = 0; i < keyFieldNames.size(); i++) {
+            String keyFieldName = keyFieldNames.get(i);
+            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
+            keyFieldIndexArr[i] = rowFieldIndex;
+            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
+        }
+        SeaTunnelRowType keyType =
+                new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
+        SerializationSchema keySerializationSchema = new JsonSerializationSchema(keyType);
+        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor =
+                row -> {
+                    Object[] keyFields = new Object[keyFieldIndexArr.length];
+                    for (int i = 0; i < keyFieldIndexArr.length; i++) {
+                        keyFields[i] = row.getField(keyFieldIndexArr[i]);
+                    }
+                    return new SeaTunnelRow(keyFields);
+                };
+        return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
+    }
+
+    @Override
+    public Message serializeRow(SeaTunnelRow row) {
+        byte[] value = valueSerialization.serialize(row);
+        if (value == null) {
+            return null;
+        }
+        byte[] key = keySerialization.serialize(row);
+        return new Message(topic, null, key == null ? null : new String(key), value);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..6c04d92f7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.rocketmq.common.message.Message;
+
+public interface SeaTunnelRowSerializer<K, V> {
+
+    /**
+     * Serialize the {@link SeaTunnelRow} to a RocketMq {@link Message}.
+     *
+     * @param row seatunnel row
+     * @return rocketmq record.
+     */
+    Message serializeRow(SeaTunnelRow row);
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/ProducerMetadata.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/ProducerMetadata.java
new file mode 100644
index 000000000..34add2fd5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/ProducerMetadata.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class ProducerMetadata implements Serializable {
+    /** basic config */
+    private RocketMqBaseConfiguration configuration;
+    /** send topic */
+    private String topic;
+
+    /** partition key fields */
+    private List<String> partitionKeyFields;
+    /** RocketMq semantics */
+    private boolean exactlyOnce;
+    /** schema format */
+    private SchemaFormat format;
+
+    /** field delimiter */
+    private String fieldDelimiter;
+
+    /** producer send sync */
+    private boolean sync;
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqNoTransactionSender.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqNoTransactionSender.java
new file mode 100644
index 000000000..92fa68a39
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqNoTransactionSender.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_SEND_MESSAGE_ERROR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_START_ERROR;
+
+@Slf4j
+public class RocketMqNoTransactionSender implements RocketMqProducerSender {
+
+    private final DefaultMQProducer rocketMqProducer;
+    private final boolean isSync;
+
+    public RocketMqNoTransactionSender(RocketMqBaseConfiguration configuration, boolean isSync) {
+        this.isSync = isSync;
+        this.rocketMqProducer = RocketMqAdminUtil.initDefaultMqProducer(configuration);
+        try {
+            this.rocketMqProducer.start();
+        } catch (MQClientException e) {
+            throw new RocketMqConnectorException(PRODUCER_START_ERROR, e);
+        }
+    }
+
+    @Override
+    public void send(Message message) {
+        if (message == null) {
+            return;
+        }
+        try {
+            if (isSync) {
+                if (StringUtils.isEmpty(message.getKeys())) {
+                    this.rocketMqProducer.send(message);
+                } else {
+                    this.rocketMqProducer.send(
+                            message, new SelectMessageQueueByHash(), message.getKeys());
+                }
+            } else {
+                SendCallback callback =
+                        new SendCallback() {
+                            @Override
+                            public void onSuccess(SendResult sendResult) {
+                                // No-op
+                            }
+
+                            @Override
+                            public void onException(Throwable e) {
+                                log.error("Failed to send data to rocketmq", e);
+                            }
+                        };
+                if (StringUtils.isEmpty(message.getKeys())) {
+                    this.rocketMqProducer.send(message, callback);
+                } else {
+                    this.rocketMqProducer.send(
+                            message, new SelectMessageQueueByHash(), message.getKeys(), callback);
+                }
+            }
+        } catch (MQClientException
+                | RemotingException
+                | InterruptedException
+                | MQBrokerException e) {
+            throw new RocketMqConnectorException(PRODUCER_SEND_MESSAGE_ERROR, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (rocketMqProducer != null) {
+            this.rocketMqProducer.shutdown();
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqProducerSender.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqProducerSender.java
new file mode 100644
index 000000000..5deded333
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqProducerSender.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.rocketmq.common.message.Message;
+
+public interface RocketMqProducerSender extends AutoCloseable {
+
+    /** Send data to RocketMq. */
+    void send(Message message);
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
new file mode 100644
index 000000000..04ea6bc3e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSink.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ProducerConfig;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.DEFAULT_FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY;
+
+@AutoService(SeaTunnelSink.class)
+public class RocketMqSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+    private static final String DEFAULT_PRODUCER_GROUP = "SeaTunnel-Producer-Group";
+    private SeaTunnelRowType seaTunnelRowType;
+    private ProducerMetadata producerMetadata;
+
+    @Override
+    public String getPluginName() {
+        return "Rocketmq";
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        CheckResult result =
+                CheckConfigUtil.checkAllExists(
+                        config, ProducerConfig.TOPIC.key(), NAME_SRV_ADDR.key());
+        if (!result.isSuccess()) {
+            throw new RocketMqConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            getPluginName(), PluginType.SINK, result.getMsg()));
+        }
+        producerMetadata = new ProducerMetadata();
+        producerMetadata.setTopic(config.getString(ProducerConfig.TOPIC.key()));
+        RocketMqBaseConfiguration.Builder baseConfigurationBuilder =
+                RocketMqBaseConfiguration.newBuilder()
+                        .producer()
+                        .namesrvAddr(config.getString(NAME_SRV_ADDR.key()));
+        // acl config
+        boolean aclEnabled = ACL_ENABLED.defaultValue();
+        if (config.hasPath(ACL_ENABLED.key())) {
+            aclEnabled = config.getBoolean(ACL_ENABLED.key());
+            if (aclEnabled
+                    && (!config.hasPath(ACCESS_KEY.key()) || !config.hasPath(SECRET_KEY.key()))) {
+                throw new RocketMqConnectorException(
+                        SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
+                        "When ACL_ENABLED "
+                                + "true , ACCESS_KEY and SECRET_KEY must be configured");
+            }
+            if (config.hasPath(ACCESS_KEY.key())) {
+                baseConfigurationBuilder.accessKey(config.getString(ACCESS_KEY.key()));
+            }
+            if (config.hasPath(SECRET_KEY.key())) {
+                baseConfigurationBuilder.secretKey(config.getString(SECRET_KEY.key()));
+            }
+        }
+        baseConfigurationBuilder.aclEnable(aclEnabled);
+
+        // config producer group
+        if (config.hasPath(ProducerConfig.PRODUCER_GROUP.key())) {
+            baseConfigurationBuilder.groupId(config.getString(ProducerConfig.PRODUCER_GROUP.key()));
+        } else {
+            baseConfigurationBuilder.groupId(DEFAULT_PRODUCER_GROUP);
+        }
+
+        if (config.hasPath(ProducerConfig.MAX_MESSAGE_SIZE.key())) {
+            baseConfigurationBuilder.maxMessageSize(
+                    config.getInt(ProducerConfig.MAX_MESSAGE_SIZE.key()));
+        } else {
+            baseConfigurationBuilder.maxMessageSize(ProducerConfig.MAX_MESSAGE_SIZE.defaultValue());
+        }
+
+        if (config.hasPath(ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS.key())) {
+            baseConfigurationBuilder.sendMsgTimeout(
+                    config.getInt(ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS.key()));
+        } else {
+            baseConfigurationBuilder.sendMsgTimeout(
+                    ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS.defaultValue());
+        }
+
+        this.producerMetadata.setConfiguration(baseConfigurationBuilder.build());
+
+        if (config.hasPath(FORMAT.key())) {
+            producerMetadata.setFormat(
+                    SchemaFormat.valueOf(config.getString(FORMAT.key()).toUpperCase()));
+        } else {
+            producerMetadata.setFormat(SchemaFormat.JSON);
+        }
+
+        if (config.hasPath(FIELD_DELIMITER.key())) {
+            producerMetadata.setFieldDelimiter(config.getString(FIELD_DELIMITER.key()));
+        } else {
+            producerMetadata.setFieldDelimiter(DEFAULT_FIELD_DELIMITER);
+        }
+
+        if (config.hasPath(ProducerConfig.PARTITION_KEY_FIELDS.key())) {
+            producerMetadata.setPartitionKeyFields(
+                    config.getStringList(ProducerConfig.PARTITION_KEY_FIELDS.key()));
+        }
+
+        boolean exactlyOnce = ProducerConfig.EXACTLY_ONCE.defaultValue();
+        if (config.hasPath(ProducerConfig.EXACTLY_ONCE.key())) {
+            exactlyOnce = config.getBoolean(ProducerConfig.EXACTLY_ONCE.key());
+        }
+        producerMetadata.setExactlyOnce(exactlyOnce);
+
+        boolean sync = ProducerConfig.SEND_SYNC.defaultValue();
+        if (config.hasPath(ProducerConfig.SEND_SYNC.key())) {
+            sync = config.getBoolean(ProducerConfig.SEND_SYNC.key());
+        }
+        producerMetadata.setSync(sync);
+    }
+
+    @Override
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
+        return this.seaTunnelRowType;
+    }
+
+    @Override
+    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
+            throws IOException {
+        return new RocketMqSinkWriter(producerMetadata, seaTunnelRowType);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
new file mode 100644
index 000000000..efad35dea
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ProducerConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RocketMqSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Rocketmq";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(ProducerConfig.TOPIC, Config.NAME_SRV_ADDR)
+                .optional(
+                        ProducerConfig.PRODUCER_GROUP,
+                        ProducerConfig.PARTITION_KEY_FIELDS,
+                        ProducerConfig.EXACTLY_ONCE,
+                        ProducerConfig.SEND_SYNC,
+                        ProducerConfig.MAX_MESSAGE_SIZE,
+                        ProducerConfig.SEND_MESSAGE_TIMEOUT_MILLIS)
+                .build();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
new file mode 100644
index 000000000..a1799f2b1
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqSinkWriter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.SeaTunnelRowSerializer;
+
+import org.apache.rocketmq.common.message.Message;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class RocketMqSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+    private final ProducerMetadata producerMetadata;
+    private final SeaTunnelRowSerializer seaTunnelRowSerializer;
+    private final RocketMqProducerSender rocketMqProducerSender;
+
+    public RocketMqSinkWriter(
+            ProducerMetadata producerMetadata, SeaTunnelRowType seaTunnelRowType) {
+        this.producerMetadata = producerMetadata;
+        this.seaTunnelRowSerializer = getSerializer(seaTunnelRowType);
+        if (producerMetadata.isExactlyOnce()) {
+            this.rocketMqProducerSender =
+                    new RocketMqTransactionSender(producerMetadata.getConfiguration());
+        } else {
+            this.rocketMqProducerSender =
+                    new RocketMqNoTransactionSender(
+                            producerMetadata.getConfiguration(), producerMetadata.isSync());
+        }
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        Message message = seaTunnelRowSerializer.serializeRow(element);
+        rocketMqProducerSender.send(message);
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (this.rocketMqProducerSender != null) {
+            try {
+                this.rocketMqProducerSender.close();
+            } catch (Exception e) {
+                throw new RocketMqConnectorException(
+                        CommonErrorCode.WRITER_OPERATION_FAILED,
+                        "Close RocketMq sink writer error",
+                        e);
+            }
+        }
+    }
+
+    private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
+            SeaTunnelRowType seaTunnelRowType) {
+        return new DefaultSeaTunnelRowSerializer(
+                producerMetadata.getTopic(),
+                getPartitionKeyFields(seaTunnelRowType),
+                seaTunnelRowType,
+                producerMetadata.getFormat(),
+                producerMetadata.getFieldDelimiter());
+    }
+
+    private List<String> getPartitionKeyFields(SeaTunnelRowType seaTunnelRowType) {
+        if (producerMetadata.getPartitionKeyFields() == null) {
+            return Collections.emptyList();
+        }
+        List<String> partitionKeyFields = producerMetadata.getPartitionKeyFields();
+        // Check whether the key exists
+        List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
+        for (String partitionKeyField : partitionKeyFields) {
+            if (!rowTypeFieldNames.contains(partitionKeyField)) {
+                throw new RocketMqConnectorException(
+                        CommonErrorCode.ILLEGAL_ARGUMENT,
+                        String.format(
+                                "Partition key field not found: %s, rowType: %s",
+                                partitionKeyField, rowTypeFieldNames));
+            }
+        }
+        return partitionKeyFields;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqTransactionSender.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqTransactionSender.java
new file mode 100644
index 000000000..d3a77f06b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqTransactionSender.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_SEND_MESSAGE_ERROR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode.PRODUCER_START_ERROR;
+
+public class RocketMqTransactionSender implements RocketMqProducerSender {
+
+    private static final String TXN_PARAM = "SeaTunnel-RocketMq";
+    private final TransactionMQProducer transactionMQProducer;
+
+    public RocketMqTransactionSender(RocketMqBaseConfiguration configuration) {
+        this.transactionMQProducer =
+                RocketMqAdminUtil.initTransactionMqProducer(
+                        configuration,
+                        new TransactionListener() {
+                            @Override
+                            public LocalTransactionState executeLocalTransaction(
+                                    Message msg, Object arg) {
+                                return LocalTransactionState.COMMIT_MESSAGE;
+                            }
+
+                            @Override
+                            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+                                return LocalTransactionState.COMMIT_MESSAGE;
+                            }
+                        });
+        try {
+            this.transactionMQProducer.start();
+        } catch (MQClientException e) {
+            throw new RocketMqConnectorException(PRODUCER_START_ERROR, e);
+        }
+    }
+
+    @Override
+    public void send(Message message) {
+        try {
+            transactionMQProducer.sendMessageInTransaction(
+                    message,
+                    StringUtils.isEmpty(message.getKeys()) ? TXN_PARAM : message.getKeys());
+        } catch (MQClientException e) {
+            throw new RocketMqConnectorException(PRODUCER_SEND_MESSAGE_ERROR, e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (transactionMQProducer != null) {
+            this.transactionMQProducer.shutdown();
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/ConsumerMetadata.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/ConsumerMetadata.java
new file mode 100644
index 000000000..77597ed16
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/ConsumerMetadata.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/** rocketmq consumer metadata */
+@Data
+public class ConsumerMetadata implements Serializable {
+    private RocketMqBaseConfiguration baseConfig = RocketMqBaseConfiguration.newBuilder().build();
+    private List<String> topics;
+    private boolean enabledCommitCheckpoint = false;
+    private StartMode startMode;
+    private Map<MessageQueue, Long> specificStartOffsets;
+    private Long startOffsetsTimestamp;
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
new file mode 100644
index 000000000..0c0786569
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class RocketMqConsumerThread implements Runnable {
+    private final DefaultLitePullConsumer consumer;
+    private final ConsumerMetadata metadata;
+    private final LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> tasks;
+
+    public RocketMqConsumerThread(ConsumerMetadata metadata) {
+        this.metadata = metadata;
+        this.tasks = new LinkedBlockingQueue<>();
+        this.consumer =
+                RocketMqAdminUtil.initDefaultLitePullConsumer(
+                        this.metadata.getBaseConfig(), !metadata.isEnabledCommitCheckpoint());
+        try {
+            this.consumer.start();
+        } catch (MQClientException e) {
+            // Start rocketmq failed
+            throw new RocketMqConnectorException(
+                    RocketMqConnectorErrorCode.CONSUMER_START_ERROR, e);
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                Consumer<DefaultLitePullConsumer> task = tasks.poll(1, TimeUnit.SECONDS);
+                if (task != null) {
+                    task.accept(consumer);
+                }
+            } catch (InterruptedException e) {
+                throw new RocketMqConnectorException(
+                        RocketMqConnectorErrorCode.CONSUME_THREAD_RUN_ERROR, e);
+            }
+        }
+    }
+
+    public LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> getTasks() {
+        return tasks;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
new file mode 100644
index 000000000..947a95a38
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ConsumerConfig;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACCESS_KEY;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.ACL_ENABLED;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.DEFAULT_FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FIELD_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.NAME_SRV_ADDR;
+import static org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config.SECRET_KEY;
+import static org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions.concise;
+
+/** RocketMq source */
+@AutoService(SeaTunnelSource.class)
+public class RocketMqSource
+        implements SeaTunnelSource<SeaTunnelRow, RocketMqSourceSplit, RocketMqSourceState>,
+                SupportParallelism {
+
+    private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
+    private final ConsumerMetadata metadata = new ConsumerMetadata();
+    private JobContext jobContext;
+    private SeaTunnelRowType typeInfo;
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private long discoveryIntervalMillis =
+            ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
+
+    @Override
+    public String getPluginName() {
+        return "Rocketmq";
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return JobMode.BATCH.equals(jobContext.getJobMode())
+                ? Boundedness.BOUNDED
+                : Boundedness.UNBOUNDED;
+    }
+
+    @Override
+    public void prepare(Config config) throws PrepareFailException {
+        // check config
+        CheckResult result =
+                CheckConfigUtil.checkAllExists(
+                        config, ConsumerConfig.TOPICS.key(), NAME_SRV_ADDR.key());
+        if (!result.isSuccess()) {
+            throw new RocketMqConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format(
+                            "PluginName: %s, PluginType: %s, Message: %s",
+                            getPluginName(), PluginType.SOURCE, result.getMsg()));
+        }
+        this.metadata.setTopics(
+                Arrays.asList(
+                        config.getString(ConsumerConfig.TOPICS.key())
+                                .split(DEFAULT_FIELD_DELIMITER)));
+
+        RocketMqBaseConfiguration.Builder baseConfigBuilder =
+                RocketMqBaseConfiguration.newBuilder()
+                        .consumer()
+                        .namesrvAddr(config.getString(NAME_SRV_ADDR.key()));
+        boolean aclEnabled = ACL_ENABLED.defaultValue();
+        if (config.hasPath(ACL_ENABLED.key())) {
+            aclEnabled = config.getBoolean(ACL_ENABLED.key());
+            if (aclEnabled
+                    && (!config.hasPath(ACCESS_KEY.key()) || !config.hasPath(SECRET_KEY.key()))) {
+                throw new RocketMqConnectorException(
+                        SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED,
+                        "When ACL_ENABLED "
+                                + "true , ACCESS_KEY and SECRET_KEY must be configured");
+            }
+            if (config.hasPath(ACCESS_KEY.key())) {
+                baseConfigBuilder.accessKey(config.getString(ACCESS_KEY.key()));
+            }
+            if (config.hasPath(SECRET_KEY.key())) {
+                baseConfigBuilder.secretKey(config.getString(SECRET_KEY.key()));
+            }
+        }
+        baseConfigBuilder.aclEnable(aclEnabled);
+        // config consumer group
+        if (config.hasPath(ConsumerConfig.CONSUMER_GROUP.key())) {
+            baseConfigBuilder.groupId(config.getString(ConsumerConfig.CONSUMER_GROUP.key()));
+        } else {
+            baseConfigBuilder.groupId(DEFAULT_CONSUMER_GROUP);
+        }
+        if (config.hasPath(ConsumerConfig.BATCH_SIZE.key())) {
+            baseConfigBuilder.batchSize(config.getInt(ConsumerConfig.BATCH_SIZE.key()));
+        } else {
+            baseConfigBuilder.batchSize(ConsumerConfig.BATCH_SIZE.defaultValue());
+        }
+        if (config.hasPath(ConsumerConfig.POLL_TIMEOUT_MILLIS.key())) {
+            baseConfigBuilder.pollTimeoutMillis(
+                    config.getInt(ConsumerConfig.POLL_TIMEOUT_MILLIS.key()));
+        } else {
+            baseConfigBuilder.pollTimeoutMillis(ConsumerConfig.POLL_TIMEOUT_MILLIS.defaultValue());
+        }
+        this.metadata.setBaseConfig(baseConfigBuilder.build());
+
+        // auto commit
+        if (config.hasPath(ConsumerConfig.COMMIT_ON_CHECKPOINT.key())) {
+            this.metadata.setEnabledCommitCheckpoint(
+                    config.getBoolean(ConsumerConfig.COMMIT_ON_CHECKPOINT.key()));
+        } else {
+            this.metadata.setEnabledCommitCheckpoint(
+                    ConsumerConfig.COMMIT_ON_CHECKPOINT.defaultValue());
+        }
+
+        StartMode startMode = ConsumerConfig.START_MODE.defaultValue();
+        if (config.hasPath(ConsumerConfig.START_MODE.key())) {
+            startMode =
+                    StartMode.valueOf(
+                            config.getString(ConsumerConfig.START_MODE.key()).toUpperCase());
+            switch (startMode) {
+                case CONSUME_FROM_TIMESTAMP:
+                    long startOffsetsTimestamp =
+                            config.getLong(ConsumerConfig.START_MODE_TIMESTAMP.key());
+                    long currentTimestamp = System.currentTimeMillis();
+                    if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > currentTimestamp) {
+                        throw new IllegalArgumentException(
+                                "The offsets timestamp value is smaller than 0 or smaller"
+                                        + " than the current time");
+                    }
+                    this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
+                    break;
+                case CONSUME_FROM_SPECIFIC_OFFSETS:
+                    Config offsets = config.getConfig(ConsumerConfig.START_MODE_OFFSETS.key());
+                    ConfigRenderOptions options = concise();
+                    String offsetsJson = offsets.root().render(options);
+                    if (offsetsJson == null) {
+                        throw new IllegalArgumentException(
+                                "start mode is "
+                                        + StartMode.CONSUME_FROM_SPECIFIC_OFFSETS
+                                        + "but no specific"
+                                        + " offsets were specified.");
+                    }
+                    Map<MessageQueue, Long> specificStartOffsets = new HashMap<>();
+                    ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
+                    jsonNodes
+                            .fieldNames()
+                            .forEachRemaining(
+                                    key -> {
+                                        int splitIndex = key.lastIndexOf("-");
+                                        String topic = key.substring(0, splitIndex);
+                                        String partition = key.substring(splitIndex + 1);
+                                        long offset = jsonNodes.get(key).asLong();
+                                        MessageQueue messageQueue =
+                                                new MessageQueue(
+                                                        topic, null, Integer.valueOf(partition));
+                                        specificStartOffsets.put(messageQueue, offset);
+                                    });
+                    this.metadata.setSpecificStartOffsets(specificStartOffsets);
+                    break;
+                default:
+                    break;
+            }
+        }
+        this.metadata.setStartMode(startMode);
+
+        if (config.hasPath(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
+            this.discoveryIntervalMillis =
+                    config.getLong(ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
+        }
+
+        // set deserialization
+        setDeserialization(config);
+    }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobContext = jobContext;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return this.typeInfo;
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(
+            SourceReader.Context readerContext) throws Exception {
+        return new RocketMqSourceReader(this.metadata, deserializationSchema, readerContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> createEnumerator(
+            SourceSplitEnumerator.Context<RocketMqSourceSplit> context) throws Exception {
+        return new RocketMqSourceSplitEnumerator(this.metadata, context, discoveryIntervalMillis);
+    }
+
+    @Override
+    public SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> restoreEnumerator(
+            SourceSplitEnumerator.Context<RocketMqSourceSplit> context,
+            RocketMqSourceState sourceState)
+            throws Exception {
+        return new RocketMqSourceSplitEnumerator(this.metadata, context, discoveryIntervalMillis);
+    }
+
+    private void setDeserialization(Config config) {
+        if (config.hasPath(ConsumerConfig.SCHEMA.key())) {
+            typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+            SchemaFormat format = SchemaFormat.JSON;
+            if (config.hasPath(FORMAT.key())) {
+                format = SchemaFormat.find(config.getString(FORMAT.key()));
+            }
+            switch (format) {
+                case JSON:
+                    deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+                    break;
+                case TEXT:
+                    String delimiter = DEFAULT_FIELD_DELIMITER;
+                    if (config.hasPath(FIELD_DELIMITER.key())) {
+                        delimiter = config.getString(FIELD_DELIMITER.key());
+                    }
+                    deserializationSchema =
+                            TextDeserializationSchema.builder()
+                                    .seaTunnelRowType(typeInfo)
+                                    .delimiter(delimiter)
+                                    .build();
+                    break;
+                default:
+                    throw new SeaTunnelJsonFormatException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
+            }
+        } else {
+            typeInfo = CatalogTableUtil.buildSimpleTextSchema();
+            this.deserializationSchema =
+                    TextDeserializationSchema.builder()
+                            .seaTunnelRowType(typeInfo)
+                            .delimiter(String.valueOf('\002'))
+                            .build();
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
new file mode 100644
index 000000000..e1f756f0b
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.StartMode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.Config;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.config.ConsumerConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class RocketMqSourceFactory implements TableSourceFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "Rocketmq";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(ConsumerConfig.TOPICS, Config.NAME_SRV_ADDR)
+                .optional(
+                        Config.FORMAT,
+                        ConsumerConfig.START_MODE,
+                        ConsumerConfig.CONSUMER_GROUP,
+                        ConsumerConfig.COMMIT_ON_CHECKPOINT,
+                        ConsumerConfig.SCHEMA,
+                        ConsumerConfig.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+                        ConsumerConfig.POLL_TIMEOUT_MILLIS,
+                        ConsumerConfig.BATCH_SIZE)
+                .conditional(
+                        ConsumerConfig.START_MODE,
+                        StartMode.CONSUME_FROM_TIMESTAMP,
+                        ConsumerConfig.START_MODE_TIMESTAMP)
+                .conditional(
+                        ConsumerConfig.START_MODE,
+                        StartMode.CONSUME_FROM_SPECIFIC_OFFSETS,
+                        ConsumerConfig.START_MODE_OFFSETS)
+                .build();
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return RocketMqSource.class;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
new file mode 100644
index 000000000..c44307d16
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class RocketMqSourceReader implements SourceReader<SeaTunnelRow, RocketMqSourceSplit> {
+
+    private static final long THREAD_WAIT_TIME = 500L;
+
+    private final SourceReader.Context context;
+    private final ConsumerMetadata metadata;
+    private final Set<RocketMqSourceSplit> sourceSplits;
+    private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
+    private final Map<MessageQueue, RocketMqConsumerThread> consumerThreads;
+    private final ExecutorService executorService;
+    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    private final LinkedBlockingQueue<RocketMqSourceSplit> pendingPartitionsQueue;
+
+    private volatile boolean running = false;
+
+    public RocketMqSourceReader(
+            ConsumerMetadata metadata,
+            DeserializationSchema<SeaTunnelRow> deserializationSchema,
+            SourceReader.Context context) {
+        this.metadata = metadata;
+        this.context = context;
+        this.sourceSplits = new HashSet<>();
+        this.deserializationSchema = deserializationSchema;
+        this.consumerThreads = new ConcurrentHashMap<>();
+        this.checkpointOffsets = new ConcurrentHashMap<>();
+        this.executorService =
+                Executors.newCachedThreadPool(r -> new Thread(r, "RocketMq Source Data Consumer"));
+        pendingPartitionsQueue = new LinkedBlockingQueue<>();
+    }
+
+    @Override
+    public void open() throws Exception {
+        // No-op
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (executorService != null) {
+            executorService.shutdownNow();
+        }
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        if (!running) {
+            Thread.sleep(THREAD_WAIT_TIME);
+            return;
+        }
+        while (pendingPartitionsQueue.size() != 0) {
+            sourceSplits.add(pendingPartitionsQueue.poll());
+        }
+        sourceSplits.forEach(
+                sourceSplit ->
+                        consumerThreads.computeIfAbsent(
+                                sourceSplit.getMessageQueue(),
+                                s -> {
+                                    RocketMqConsumerThread thread =
+                                            new RocketMqConsumerThread(metadata);
+                                    executorService.submit(thread);
+                                    return thread;
+                                }));
+        sourceSplits.forEach(
+                sourceSplit -> {
+                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+                    try {
+                        consumerThreads
+                                .get(sourceSplit.getMessageQueue())
+                                .getTasks()
+                                .put(
+                                        consumer -> {
+                                            try {
+                                                Set<MessageQueue> messageQueues =
+                                                        Sets.newHashSet(
+                                                                sourceSplit.getMessageQueue());
+                                                consumer.assign(messageQueues);
+                                                if (sourceSplit.getStartOffset() >= 0) {
+                                                    consumer.seek(
+                                                            sourceSplit.getMessageQueue(),
+                                                            sourceSplit.getStartOffset());
+                                                }
+                                                List<MessageExt> records =
+                                                        consumer.poll(
+                                                                metadata.getBaseConfig()
+                                                                        .getPollTimeoutMillis());
+                                                if (records.isEmpty()) {
+                                                    log.warn(
+                                                            "Rocketmq consumer can not pull data, split {}, start offset {}, end offset {}",
+                                                            sourceSplit.getMessageQueue(),
+                                                            sourceSplit.getStartOffset(),
+                                                            sourceSplit.getEndOffset());
+                                                }
+                                                Map<MessageQueue, List<MessageExt>> groupRecords =
+                                                        records.stream()
+                                                                .collect(
+                                                                        Collectors.groupingBy(
+                                                                                record ->
+                                                                                        new MessageQueue(
+                                                                                                record
+                                                                                                        .getTopic(),
+                                                                                                record
+                                                                                                        .getBrokerName(),
+                                                                                                record
+                                                                                                        .getQueueId())));
+                                                for (MessageQueue messageQueue : messageQueues) {
+                                                    if (!groupRecords.containsKey(messageQueue)) {
+                                                        continue;
+                                                    }
+                                                    List<MessageExt> messages =
+                                                            groupRecords.get(messageQueue);
+                                                    for (MessageExt record : messages) {
+                                                        deserializationSchema.deserialize(
+                                                                record.getBody(), output);
+                                                        if (Boundedness.BOUNDED.equals(
+                                                                        context.getBoundedness())
+                                                                && record.getQueueOffset()
+                                                                        >= sourceSplit
+                                                                                .getEndOffset()) {
+                                                            break;
+                                                        }
+                                                    }
+                                                    long lastOffset = -1;
+                                                    if (!messages.isEmpty()) {
+                                                        lastOffset =
+                                                                messages.get(messages.size() - 1)
+                                                                        .getQueueOffset();
+                                                        sourceSplit.setStartOffset(lastOffset);
+                                                    }
+
+                                                    if (lastOffset >= sourceSplit.getEndOffset()) {
+                                                        sourceSplit.setEndOffset(lastOffset);
+                                                    }
+                                                }
+                                            } catch (Exception e) {
+                                                completableFuture.completeExceptionally(e);
+                                            }
+                                            completableFuture.complete(null);
+                                        });
+                    } catch (InterruptedException e) {
+                        throw new RocketMqConnectorException(
+                                RocketMqConnectorErrorCode.CONSUME_DATA_FAILED, e);
+                    }
+                    completableFuture.join();
+                });
+
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            context.signalNoMoreElement();
+        }
+    }
+
+    @Override
+    public List<RocketMqSourceSplit> snapshotState(long checkpointId) throws Exception {
+        List<RocketMqSourceSplit> pendingSplit =
+                sourceSplits.stream().map(RocketMqSourceSplit::copy).collect(Collectors.toList());
+        Map<MessageQueue, Long> offsets =
+                checkpointOffsets.computeIfAbsent(checkpointId, id -> Maps.newConcurrentMap());
+        for (RocketMqSourceSplit split : pendingSplit) {
+            offsets.put(split.getMessageQueue(), split.getStartOffset());
+        }
+        return pendingSplit;
+    }
+
+    @Override
+    public void addSplits(List<RocketMqSourceSplit> splits) {
+        running = true;
+        splits.forEach(
+                s -> {
+                    try {
+                        pendingPartitionsQueue.put(s);
+                    } catch (InterruptedException e) {
+                        throw new RocketMqConnectorException(
+                                RocketMqConnectorErrorCode.ADD_SPLIT_CHECKPOINT_FAILED, e);
+                    }
+                });
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        // No-op
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (!checkpointOffsets.containsKey(checkpointId)) {
+            log.warn("checkpoint {} do not exist or have already been committed.", checkpointId);
+        } else {
+            Map<MessageQueue, Long> messageQueueOffset = checkpointOffsets.remove(checkpointId);
+            for (Map.Entry<MessageQueue, Long> entry : messageQueueOffset.entrySet()) {
+                MessageQueue messageQueue = entry.getKey();
+                Long offset = entry.getValue();
+                try {
+                    if (messageQueue != null && offset != null) {
+                        consumerThreads
+                                .get(messageQueue)
+                                .getTasks()
+                                .put(
+                                        consumer -> {
+                                            if (this.metadata.isEnabledCommitCheckpoint()) {
+                                                consumer.getOffsetStore()
+                                                        .updateOffset(messageQueue, offset, false);
+                                                consumer.getOffsetStore().persist(messageQueue);
+                                            }
+                                        });
+                    }
+                } catch (InterruptedException e) {
+                    log.error("commit offset failed", e);
+                }
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplit.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplit.java
new file mode 100644
index 000000000..90fa136c5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplit.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/** define rocketmq source split */
+public class RocketMqSourceSplit implements SourceSplit {
+    private MessageQueue messageQueue;
+    private long startOffset = -1L;
+    private long endOffset = -1L;
+
+    public RocketMqSourceSplit() {}
+
+    public RocketMqSourceSplit(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+    public RocketMqSourceSplit(MessageQueue messageQueue, long startOffset, long endOffset) {
+        this.messageQueue = messageQueue;
+        this.startOffset = startOffset;
+        this.endOffset = endOffset;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    public long getEndOffset() {
+        return endOffset;
+    }
+
+    public void setEndOffset(long endOffset) {
+        this.endOffset = endOffset;
+    }
+
+    @Override
+    public String splitId() {
+        return this.messageQueue.getTopic()
+                + "-"
+                + this.messageQueue.getBrokerName()
+                + "-"
+                + this.messageQueue.getQueueId();
+    }
+
+    public RocketMqSourceSplit copy() {
+        return new RocketMqSourceSplit(
+                this.messageQueue, this.getStartOffset(), this.getEndOffset());
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
new file mode 100644
index 000000000..9a4912cd9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class RocketMqSourceSplitEnumerator
+        implements SourceSplitEnumerator<RocketMqSourceSplit, RocketMqSourceState> {
+
+    private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60 * 1000;
+    private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
+    private final ConsumerMetadata metadata;
+    private final Context<RocketMqSourceSplit> context;
+    private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
+    private ScheduledExecutorService executor;
+    private ScheduledFuture scheduledFuture;
+    // ms
+    private long discoveryIntervalMillis;
+
+    public RocketMqSourceSplitEnumerator(
+            ConsumerMetadata metadata, SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+        this.metadata = metadata;
+        this.context = context;
+        this.assignedSplit = new HashMap<>();
+        this.pendingSplit = new HashMap<>();
+    }
+
+    public RocketMqSourceSplitEnumerator(
+            ConsumerMetadata metadata,
+            SourceSplitEnumerator.Context<RocketMqSourceSplit> context,
+            long discoveryIntervalMillis) {
+        this(metadata, context);
+        this.discoveryIntervalMillis = discoveryIntervalMillis;
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private static int getSplitOwner(MessageQueue messageQueue, int numReaders) {
+        int startIndex = ((messageQueue.getQueueId() * 31) & 0x7FFFFFFF) % numReaders;
+        return (startIndex + messageQueue.getQueueId()) % numReaders;
+    }
+
+    @Override
+    public void open() {
+        discoveryIntervalMillis =
+                discoveryIntervalMillis > 0
+                        ? discoveryIntervalMillis
+                        : DEFAULT_DISCOVERY_INTERVAL_MILLIS;
+        if (discoveryIntervalMillis > 0) {
+            this.executor =
+                    Executors.newScheduledThreadPool(
+                            1,
+                            runnable -> {
+                                Thread thread = new Thread(runnable);
+                                thread.setDaemon(true);
+                                thread.setName("RocketMq-messageQueue-dynamic-discovery");
+                                return thread;
+                            });
+            this.scheduledFuture =
+                    executor.scheduleWithFixedDelay(
+                            () -> {
+                                try {
+                                    discoverySplits();
+                                } catch (Exception e) {
+                                    log.error("Dynamic discovery failure:", e);
+                                }
+                            },
+                            discoveryIntervalMillis,
+                            discoveryIntervalMillis,
+                            TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void run() throws Exception {
+        fetchPendingPartitionSplit();
+        setPartitionStartOffset();
+        assignSplit();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    @Override
+    public void addSplitsBack(List<RocketMqSourceSplit> splits, int subtaskId) {
+        if (!splits.isEmpty()) {
+            pendingSplit.putAll(convertToNextSplit(splits));
+            assignSplit();
+        }
+    }
+
+    private Map<MessageQueue, ? extends RocketMqSourceSplit> convertToNextSplit(
+            List<RocketMqSourceSplit> splits) {
+        try {
+            Map<MessageQueue, Long> listOffsets =
+                    listOffsets(
+                            splits.stream()
+                                    .map(RocketMqSourceSplit::getMessageQueue)
+                                    .collect(Collectors.toList()),
+                            ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+            splits.forEach(
+                    split -> {
+                        split.setStartOffset(split.getEndOffset() + 1);
+                        split.setEndOffset(listOffsets.get(split.getMessageQueue()));
+                    });
+            return splits.stream()
+                    .collect(Collectors.toMap(split -> split.getMessageQueue(), split -> split));
+        } catch (Exception e) {
+            throw new RocketMqConnectorException(
+                    RocketMqConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
+        }
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {
+        // No-op
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        if (!pendingSplit.isEmpty()) {
+            assignSplit();
+        }
+    }
+
+    @Override
+    public RocketMqSourceState snapshotState(long checkpointId) throws Exception {
+        return new RocketMqSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        // No-op
+    }
+
+    private void discoverySplits() {
+        fetchPendingPartitionSplit();
+        assignSplit();
+    }
+
+    private void fetchPendingPartitionSplit() {
+        getTopicInfo()
+                .forEach(
+                        split -> {
+                            if (!assignedSplit.containsKey(split.getMessageQueue())) {
+                                if (!pendingSplit.containsKey(split.getMessageQueue())) {
+                                    pendingSplit.put(split.getMessageQueue(), split);
+                                }
+                            }
+                        });
+    }
+
+    private Set<RocketMqSourceSplit> getTopicInfo() {
+        log.info("Configured topics: {}", metadata.getTopics());
+        List<Map<MessageQueue, TopicOffset>> offsetTopics =
+                RocketMqAdminUtil.offsetTopics(metadata.getBaseConfig(), metadata.getTopics());
+        Set<RocketMqSourceSplit> sourceSplits = Sets.newConcurrentHashSet();
+        offsetTopics.forEach(
+                messageQueueOffsets -> {
+                    messageQueueOffsets.forEach(
+                            (messageQueue, topicOffset) -> {
+                                sourceSplits.add(
+                                        new RocketMqSourceSplit(
+                                                messageQueue,
+                                                topicOffset.getMinOffset(),
+                                                topicOffset.getMaxOffset()));
+                            });
+                });
+        return sourceSplits;
+    }
+
+    private void setPartitionStartOffset() throws MQClientException {
+        Collection<MessageQueue> topicPartitions = pendingSplit.keySet();
+        Map<MessageQueue, Long> topicPartitionOffsets = null;
+        switch (metadata.getStartMode()) {
+            case CONSUME_FROM_FIRST_OFFSET:
+                topicPartitionOffsets =
+                        listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+                break;
+            case CONSUME_FROM_LAST_OFFSET:
+                topicPartitionOffsets =
+                        listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+                break;
+            case CONSUME_FROM_TIMESTAMP:
+                topicPartitionOffsets =
+                        listOffsets(topicPartitions, ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
+                break;
+            case CONSUME_FROM_GROUP_OFFSETS:
+                topicPartitionOffsets = listConsumerGroupOffsets(topicPartitions);
+                if (topicPartitionOffsets.isEmpty()) {
+                    topicPartitionOffsets =
+                            listOffsets(
+                                    topicPartitions, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+                }
+                break;
+            case CONSUME_FROM_SPECIFIC_OFFSETS:
+                topicPartitionOffsets = metadata.getSpecificStartOffsets();
+                // Fill in broker name
+                setMessageQueueBroker(topicPartitions, topicPartitionOffsets);
+                break;
+            default:
+                throw new RocketMqConnectorException(
+                        RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
+                        metadata.getStartMode().name());
+        }
+        topicPartitionOffsets
+                .entrySet()
+                .forEach(
+                        entry -> {
+                            if (pendingSplit.containsKey(entry.getKey())) {
+                                pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
+                            }
+                        });
+    }
+
+    private void setMessageQueueBroker(
+            Collection<MessageQueue> topicPartitions,
+            Map<MessageQueue, Long> topicPartitionOffsets) {
+        Map<String, String> flatTopicPartitions =
+                topicPartitions.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        messageQueue ->
+                                                messageQueue.getTopic()
+                                                        + "-"
+                                                        + messageQueue.getQueueId(),
+                                        MessageQueue::getBrokerName));
+        for (MessageQueue messageQueue : topicPartitionOffsets.keySet()) {
+            String key = messageQueue.getTopic() + "-" + messageQueue.getQueueId();
+            if (flatTopicPartitions.containsKey(key)) {
+                messageQueue.setBrokerName(flatTopicPartitions.get(key));
+            }
+        }
+    }
+
+    private Map<MessageQueue, Long> listOffsets(
+            Collection<MessageQueue> messageQueues, ConsumeFromWhere consumeFromWhere) {
+        Map<MessageQueue, Long> results = Maps.newConcurrentMap();
+        Map<MessageQueue, TopicOffset> messageQueueOffsets =
+                RocketMqAdminUtil.flatOffsetTopics(metadata.getBaseConfig(), metadata.getTopics());
+        switch (consumeFromWhere) {
+            case CONSUME_FROM_FIRST_OFFSET:
+                messageQueues.forEach(
+                        messageQueue -> {
+                            TopicOffset topicOffset = messageQueueOffsets.get(messageQueue);
+                            results.put(messageQueue, topicOffset.getMinOffset());
+                        });
+                break;
+            case CONSUME_FROM_LAST_OFFSET:
+                messageQueues.forEach(
+                        messageQueue -> {
+                            TopicOffset topicOffset = messageQueueOffsets.get(messageQueue);
+                            results.put(messageQueue, topicOffset.getMaxOffset());
+                        });
+                break;
+            case CONSUME_FROM_TIMESTAMP:
+                results.putAll(
+                        RocketMqAdminUtil.searchOffsetsByTimestamp(
+                                metadata.getBaseConfig(),
+                                messageQueues,
+                                metadata.getStartOffsetsTimestamp()));
+                break;
+            default:
+                // No-op
+                break;
+        }
+        return results;
+    }
+
+    /** list consumer group offsets */
+    public Map<MessageQueue, Long> listConsumerGroupOffsets(
+            Collection<MessageQueue> messageQueues) {
+        return RocketMqAdminUtil.currentOffsets(
+                metadata.getBaseConfig(), metadata.getTopics(), new HashSet<>(messageQueues));
+    }
+
+    private synchronized void assignSplit() {
+        Map<Integer, List<RocketMqSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE);
+        for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
+            readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
+        }
+        pendingSplit
+                .entrySet()
+                .forEach(
+                        s -> {
+                            if (!assignedSplit.containsKey(s.getKey())) {
+                                readySplit
+                                        .get(
+                                                getSplitOwner(
+                                                        s.getKey(), context.currentParallelism()))
+                                        .add(s.getValue());
+                            }
+                        });
+        readySplit.forEach(context::assignSplit);
+        assignedSplit.putAll(pendingSplit);
+        pendingSplit.clear();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceState.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceState.java
new file mode 100644
index 000000000..a3d923757
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class RocketMqSourceState implements Serializable {
+
+    private Set<RocketMqSourceSplit> assignSplits;
+
+    public RocketMqSourceState(Set<RocketMqSourceSplit> assignSplits) {
+        this.assignSplits = assignSplits;
+    }
+
+    public Set<RocketMqSourceSplit> getAssignSplits() {
+        return assignSplits;
+    }
+
+    public void setAssignSplits(Set<RocketMqSourceSplit> assignSplits) {
+        this.assignSplits = assignSplits;
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 34e4e850c..e9c7268d9 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -70,6 +70,7 @@
         <module>connector-tdengine</module>
         <module>connector-selectdb-cloud</module>
         <module>connector-hbase</module>
+        <module>connector-rocketmq</module>
     </modules>
 
     <dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 72a4011cc..e9fc30f84 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -480,6 +480,13 @@
                     <scope>provided</scope>
                 </dependency>
 
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-rocketmq</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+
                 <!-- jdbc driver -->
                 <dependency>
                     <groupId>com.aliyun.phoenix</groupId>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/pom.xml
similarity index 51%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/pom.xml
index a0e839d77..285df10dd 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/pom.xml
@@ -18,78 +18,46 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
-
-    <artifactId>seatunnel-connector-v2-e2e</artifactId>
-    <packaging>pom</packaging>
-    <name>SeaTunnel : E2E : Connector V2 :</name>
-
-    <modules>
-        <module>connector-assert-e2e</module>
-        <module>connector-jdbc-e2e</module>
-        <module>connector-redis-e2e</module>
-        <module>connector-cdc-sqlserver-e2e</module>
-        <module>connector-clickhouse-e2e</module>
-        <module>connector-starrocks-e2e</module>
-        <module>connector-influxdb-e2e</module>
-        <module>connector-amazondynamodb-e2e</module>
-        <module>connector-file-local-e2e</module>
-        <module>connector-cassandra-e2e</module>
-        <module>connector-neo4j-e2e</module>
-        <module>connector-http-e2e</module>
-        <module>connector-rabbitmq-e2e</module>
-        <module>connector-kafka-e2e</module>
-        <module>connector-doris-e2e</module>
-        <module>connector-fake-e2e</module>
-        <module>connector-elasticsearch-e2e</module>
-        <module>connector-iotdb-e2e</module>
-        <module>connector-cdc-mysql-e2e</module>
-        <module>connector-iceberg-e2e</module>
-        <module>connector-iceberg-hadoop3-e2e</module>
-        <module>connector-tdengine-e2e</module>
-        <module>connector-datahub-e2e</module>
-        <module>connector-mongodb-e2e</module>
-        <module>connector-hbase-e2e</module>
-        <module>connector-maxcompute-e2e</module>
-    </modules>
+    <artifactId>connector-rocketmq-e2e</artifactId>
 
     <dependencies>
+        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-e2e-common</artifactId>
-            <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-13-starter</artifactId>
+            <artifactId>connector-rocketmq</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-15-starter</artifactId>
+            <artifactId>connector-console</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-2-starter</artifactId>
+            <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-3-starter</artifactId>
+            <artifactId>connector-fake</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-starter</artifactId>
+            <artifactId>seatunnel-transforms-v2</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
new file mode 100644
index 000000000..94c50ae05
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqContainer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.rocketmq;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.SneakyThrows;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+/** rocketmq container */
+public class RocketMqContainer extends GenericContainer<RocketMqContainer> {
+
+    public static final int NAMESRV_PORT = 9876;
+    public static final int BROKER_PORT = 10911;
+    public static final String BROKER_NAME = "broker-a";
+    private static final int DEFAULT_BROKER_PERMISSION = 6;
+
+    public RocketMqContainer(DockerImageName image) {
+        super(image);
+        withExposedPorts(NAMESRV_PORT, BROKER_PORT, BROKER_PORT - 2);
+    }
+
+    @Override
+    protected void configure() {
+        String command = "#!/bin/bash\n";
+        command += "./mqnamesrv &\n";
+        command += "./mqbroker -n localhost:" + NAMESRV_PORT;
+        withCommand("sh", "-c", command);
+    }
+
+    @Override
+    @SneakyThrows
+    protected void containerIsStarted(InspectContainerResponse containerInfo) {
+        List<String> updateBrokerConfigCommands = new ArrayList<>();
+        updateBrokerConfigCommands.add(updateBrokerConfig("autoCreateTopicEnable", true));
+        updateBrokerConfigCommands.add(updateBrokerConfig("brokerName", BROKER_NAME));
+        updateBrokerConfigCommands.add(updateBrokerConfig("brokerIP1", getLinuxLocalIp()));
+        updateBrokerConfigCommands.add(
+                updateBrokerConfig("listenPort", getMappedPort(BROKER_PORT)));
+        updateBrokerConfigCommands.add(
+                updateBrokerConfig("brokerPermission", DEFAULT_BROKER_PERMISSION));
+        final String command = String.join(" && ", updateBrokerConfigCommands);
+        ExecResult result = execInContainer("/bin/sh", "-c", command);
+        if (result != null && result.getExitCode() != 0) {
+            throw new IllegalStateException(result.toString());
+        }
+    }
+
+    private String updateBrokerConfig(final String key, final Object val) {
+        final String brokerAddr = "localhost:" + BROKER_PORT;
+        return "./mqadmin updateBrokerConfig -b " + brokerAddr + " -k " + key + " -v " + val;
+    }
+
+    public String getNameSrvAddr() {
+        return String.format("%s:%s", getHost(), getMappedPort(NAMESRV_PORT));
+    }
+
+    public String getLinuxLocalIp() {
+        String ip = "";
+        try {
+            Enumeration<NetworkInterface> networkInterfaces =
+                    NetworkInterface.getNetworkInterfaces();
+            while (networkInterfaces.hasMoreElements()) {
+                NetworkInterface networkInterface = networkInterfaces.nextElement();
+                Enumeration<InetAddress> inetAddresses = networkInterface.getInetAddresses();
+                while (inetAddresses.hasMoreElements()) {
+                    InetAddress inetAddress = inetAddresses.nextElement();
+                    if (!inetAddress.isLoopbackAddress() && inetAddress instanceof Inet4Address) {
+                        ip = inetAddress.getHostAddress();
+                    }
+                }
+            }
+        } catch (SocketException ex) {
+            ex.printStackTrace();
+        }
+        return ip;
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
new file mode 100644
index 000000000..fed95a41d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.rocketmq;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.SchemaFormat;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.engine.common.Constant;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.admin.TopicOffset;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT;
+
+@Slf4j
+public class RocketMqIT extends TestSuiteBase implements TestResource {
+
+    private static final String IMAGE = "apache/rocketmq:4.9.4";
+    private static final String ROCKETMQ_GROUP = "SeaTunnel-rocketmq-group";
+    private static final String HOST = "rocketmq-e2e";
+    private static final SchemaFormat DEFAULT_FORMAT = SchemaFormat.JSON;
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+    private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
+            new SeaTunnelRowType(
+                    new String[] {
+                        "id",
+                        "c_map",
+                        "c_array",
+                        "c_string",
+                        "c_boolean",
+                        "c_tinyint",
+                        "c_smallint",
+                        "c_int",
+                        "c_bigint",
+                        "c_float",
+                        "c_double",
+                        "c_decimal",
+                        "c_bytes",
+                        "c_date",
+                        "c_timestamp"
+                    },
+                    new SeaTunnelDataType[] {
+                        BasicType.LONG_TYPE,
+                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
+                        ArrayType.BYTE_ARRAY_TYPE,
+                        BasicType.STRING_TYPE,
+                        BasicType.BOOLEAN_TYPE,
+                        BasicType.BYTE_TYPE,
+                        BasicType.SHORT_TYPE,
+                        BasicType.INT_TYPE,
+                        BasicType.LONG_TYPE,
+                        BasicType.FLOAT_TYPE,
+                        BasicType.DOUBLE_TYPE,
+                        new DecimalType(2, 1),
+                        PrimitiveByteArrayType.INSTANCE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE
+                    });
+    private RocketMqContainer rocketMqContainer;
+    private DefaultMQProducer producer;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        this.rocketMqContainer =
+                new RocketMqContainer(DockerImageName.parse(IMAGE))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+                        .waitingFor(
+                                new HostPortWaitStrategy()
+                                        .withStartupTimeout(Duration.ofMinutes(2)));
+        rocketMqContainer.setPortBindings(
+                Lists.newArrayList(String.format("%s:%s", NAMESRV_PORT, NAMESRV_PORT)));
+        rocketMqContainer.start();
+        log.info("RocketMq container started");
+        initProducer();
+        log.info("Write 100 records to topic test_topic_source");
+        DefaultSeaTunnelRowSerializer serializer =
+                new DefaultSeaTunnelRowSerializer(
+                        "test_topic_source",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), "test_topic_source", 0, 100);
+    }
+
+    @SneakyThrows
+    private void initProducer() {
+        this.producer = new DefaultMQProducer();
+        this.producer.setNamesrvAddr(rocketMqContainer.getNameSrvAddr());
+        this.producer.setInstanceName(UUID.randomUUID().toString());
+        this.producer.setProducerGroup(ROCKETMQ_GROUP);
+        this.producer.setLanguage(LanguageCode.JAVA);
+        this.producer.setSendMsgTimeout(15000);
+        this.producer.start();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (this.producer != null) {
+            this.producer.shutdown();
+        }
+        if (this.rocketMqContainer != null) {
+            this.rocketMqContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkRocketMq(TestContainer container) throws IOException, InterruptedException {
+
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq-sink_fake_to_rocketmq.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+
+        String topicName = "test_topic";
+        Map<String, String> data = getRocketMqConsumerData(topicName);
+        ObjectMapper objectMapper = new ObjectMapper();
+        String key = data.keySet().iterator().next();
+        ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
+        Assertions.assertTrue(objectNode.has("c_map"));
+        Assertions.assertTrue(objectNode.has("c_string"));
+        Assertions.assertEquals(10, data.size());
+    }
+
+    @TestTemplate
+    public void testTextFormatSinkRocketMq(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq-text-sink_fake_to_rocketmq.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+        String topicName = "test_text_topic";
+        Map<String, String> data = getRocketMqConsumerData(topicName);
+        Assertions.assertEquals(10, data.size());
+    }
+
+    @TestTemplate
+    public void testSourceRocketMqTextToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                new DefaultSeaTunnelRowSerializer(
+                        "test_topic_text",
+                        SEATUNNEL_ROW_TYPE,
+                        SchemaFormat.TEXT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), "test_topic_text", 0, 100);
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq-source_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testSourceRocketMqJsonToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                new DefaultSeaTunnelRowSerializer(
+                        "test_topic_json",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), "test_topic_json", 0, 100);
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq-source_json_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testRocketMqLatestToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq/rocketmq_source_latest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testRocketMqEarliestToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq/rocketmq_source_earliest_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testRocketMqSpecificOffsetsToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq/rocketmq_source_specific_offsets_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testRocketMqTimestampToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq/rocketmq_source_timestamp_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testSourceRocketMqStartConfig(TestContainer container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                new DefaultSeaTunnelRowSerializer(
+                        "test_topic_group",
+                        SEATUNNEL_ROW_TYPE,
+                        DEFAULT_FORMAT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(row -> serializer.serializeRow(row), "test_topic_group", 100, 150);
+        testRocketMqGroupOffsetsToConsole(container);
+    }
+
+    public void testRocketMqGroupOffsetsToConsole(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob("/rocketmq/rocketmq_source_group_offset_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @SneakyThrows
+    @SuppressWarnings("checkstyle:Indentation")
+    private void generateTestData(
+            ProducerRecordConverter converter, String topic, int start, int end) {
+        for (int i = start; i < end; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                Long.valueOf(i),
+                                Collections.singletonMap("key", Short.parseShort("1")),
+                                new Byte[] {Byte.parseByte("1")},
+                                "string",
+                                Boolean.FALSE,
+                                Byte.parseByte("1"),
+                                Short.parseShort("1"),
+                                Integer.parseInt("1"),
+                                Long.parseLong("1"),
+                                Float.parseFloat("1.1"),
+                                Double.parseDouble("1.1"),
+                                BigDecimal.valueOf(11, 1),
+                                "test".getBytes(),
+                                LocalDate.now(),
+                                LocalDateTime.now()
+                            });
+            Message message = converter.convert(row);
+            producer.send(message, new MessageQueue(topic, RocketMqContainer.BROKER_NAME, 0));
+        }
+    }
+
+    @SneakyThrows
+    private Map<String, String> getRocketMqConsumerData(String topicName) {
+        Map<String, String> data = new HashMap<>();
+        DefaultLitePullConsumer consumer =
+                RocketMqAdminUtil.initDefaultLitePullConsumer(newConfiguration(), false);
+        consumer.start();
+        // assign
+        Map<MessageQueue, TopicOffset> queueOffsets =
+                RetryUtils.retryWithException(
+                        () -> {
+                            return RocketMqAdminUtil.offsetTopics(
+                                            newConfiguration(), Lists.newArrayList(topicName))
+                                    .get(0);
+                        },
+                        new RetryUtils.RetryMaterial(
+                                Constant.OPERATION_RETRY_TIME,
+                                false,
+                                exception -> exception instanceof RocketMqConnectorException,
+                                Constant.OPERATION_RETRY_SLEEP));
+        consumer.assign(queueOffsets.keySet());
+        // seek to offset
+        Map<MessageQueue, Long> currentOffsets =
+                RocketMqAdminUtil.currentOffsets(
+                        newConfiguration(), Lists.newArrayList(topicName), queueOffsets.keySet());
+        for (MessageQueue mq : queueOffsets.keySet()) {
+            long currentOffset =
+                    currentOffsets.containsKey(mq)
+                            ? currentOffsets.get(mq)
+                            : queueOffsets.get(mq).getMinOffset();
+            consumer.seek(mq, currentOffset);
+        }
+        while (true) {
+            List<MessageExt> messages = consumer.poll(5000);
+            if (messages.isEmpty()) {
+                break;
+            }
+            for (MessageExt message : messages) {
+                data.put(message.getKeys(), new String(message.getBody(), StandardCharsets.UTF_8));
+            }
+            consumer.commitSync();
+        }
+        if (consumer != null) {
+            consumer.shutdown();
+        }
+        log.info("Consumer {} data total {}", topicName, data.size());
+        return data;
+    }
+
+    public RocketMqBaseConfiguration newConfiguration() {
+        return RocketMqBaseConfiguration.newBuilder()
+                .groupId(ROCKETMQ_GROUP)
+                .aclEnable(false)
+                .namesrvAddr(rocketMqContainer.getNameSrvAddr())
+                .build();
+    }
+
+    interface ProducerRecordConverter {
+        Message convert(SeaTunnelRow row);
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/log4j2-test.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..e0bfc5a7b
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/log4j2-test.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
+
+
+logger.testcontainers.name=org.testcontainers
+logger.testcontainers.level=INFO
+
+logger.dockerjava.name=com.github.dockerjava
+logger.dockerjava.level=INFO
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-sink_fake_to_rocketmq.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-sink_fake_to_rocketmq.conf
new file mode 100644
index 000000000..ca21a11ce
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-sink_fake_to_rocketmq.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topic = "test_topic"
+    partition.key.fields = ["c_map","c_string"]
+    producer.send.sync = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
new file mode 100644
index 000000000..40f4f1f2d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_json_to_console.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_json"
+    result_table_name = "rocketmq_table"
+    schema = {
+      fields {
+           id = bigint
+           c_map = "map<string, smallint>"
+           c_array = "array<tinyint>"
+           c_string = string
+           c_boolean = boolean
+           c_tinyint = tinyint
+           c_smallint = smallint
+           c_int = int
+           c_bigint = bigint
+           c_float = float
+           c_double = double
+           c_decimal = "decimal(2, 1)"
+           c_bytes = bytes
+           c_date = date
+           c_timestamp = timestamp
+      }
+    }
+  }
+
+}
+
+transform {
+}
+
+sink {
+  Console {}
+  Assert {
+     rules =
+       {
+         field_rules = [
+             {
+                 field_name = id
+                 field_type = long
+                 field_value = [
+                     {
+                         rule_type = NOT_NULL
+                     },
+                     {
+                         rule_type = MIN
+                         rule_value = 0
+                     },
+                     {
+                         rule_type = MAX
+                         rule_value = 99
+                     }
+                 ]
+             }
+         ]
+       }
+
+   }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
new file mode 100644
index 000000000..d04cda5b4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_text_to_console.conf
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    # You can set spark configuration here
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_text"
+    result_table_name = "rocketmq_table"
+    schema = {
+      fields {
+           id = bigint
+           c_map = "map<string, smallint>"
+           c_array = "array<tinyint>"
+           c_string = string
+           c_boolean = boolean
+           c_tinyint = tinyint
+           c_smallint = smallint
+           c_int = int
+           c_bigint = bigint
+           c_float = float
+           c_double = double
+           c_decimal = "decimal(2, 1)"
+           c_bytes = bytes
+           c_date = date
+           c_timestamp = timestamp
+      }
+    }
+    format = text
+    # The default field delimiter is ","
+    field_delimiter = ","
+  }
+}
+
+transform {
+}
+
+sink {
+  Console {}
+  Assert {
+     rules = {
+         field_rules = [
+ 		   {
+             field_name = id
+             field_type = long
+             field_value = [
+                 {
+                     rule_type = NOT_NULL
+                 },
+                 {
+                     rule_type = MIN
+                     rule_value = 0
+                 },
+                 {
+                     rule_type = MAX
+                     rule_value = 99
+                 }
+             ]
+           }
+        ]
+     }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-text-sink_fake_to_rocketmq.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-text-sink_fake_to_rocketmq.conf
new file mode 100644
index 000000000..935936dc3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-text-sink_fake_to_rocketmq.conf
@@ -0,0 +1,70 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topic = "test_text_topic"
+    format = text
+    partition.key.fields = ["c_map","c_string"]
+    producer.send.sync = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
new file mode 100644
index 000000000..07e3ad2bf
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_earliest_to_console.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  job.mode = "BATCH"
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_source"
+    result_table_name = "rocketmq_table"
+    format = json
+    start.mode = "CONSUME_FROM_FIRST_OFFSET"
+    schema = {
+       fields {
+            id = bigint
+       }
+     }
+  }
+}
+
+transform {
+ }
+
+sink  {
+       Console {}
+       Assert {
+           rules = {
+               field_rules = [
+                  {
+                    field_name = id
+                    field_type = long
+                    field_value = [
+                             {
+                                 rule_type = MIN
+                                 rule_value = 0
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                     ]
+                  }
+               ]
+           }
+       }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
new file mode 100644
index 000000000..ce881c0e9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_group_offset_to_console.conf
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_group"
+    result_table_name = "rocketmq_table"
+    format = json
+    start.mode = "CONSUME_FROM_GROUP_OFFSETS"
+    schema = {
+       fields {
+            id = bigint
+       }
+     }
+  }
+}
+
+transform {
+ }
+
+sink  {
+       Console {}
+       Assert {
+            rules = {
+                  field_rules = [
+        	      	  {
+                         field_name = id
+                         field_type = long
+                         field_value = [
+
+                             {
+                                 rule_type = MIN
+                                 rule_value = 100
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 149
+                             }
+                         ]
+                      }
+                  ]
+             }
+       }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
new file mode 100644
index 000000000..65eae9194
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_latest_to_console.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_source"
+    result_table_name = "rocketmq_table"
+    format = json
+    start.mode = "CONSUME_FROM_LAST_OFFSET"
+    schema = {
+       fields {
+            id = bigint
+       }
+     }
+  }
+}
+
+transform {
+}
+
+sink  {
+       Console {}
+       Assert {
+           rules = {
+               field_rules = [
+        	       {
+                       field_name = id
+                       field_type = long
+                       field_value = [
+                             {
+                                 rule_type = MIN
+                                 rule_value = 99
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                        ]
+                   }
+               ]
+           }
+      }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
new file mode 100644
index 000000000..7b33f49c4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_specific_offsets_to_console.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_source"
+    result_table_name = "rocketmq_table"
+    # The default format is json, which is optional
+    format = json
+    start.mode = "CONSUME_FROM_SPECIFIC_OFFSETS"
+    schema = {
+       fields {
+            id = bigint
+       }
+     }
+
+    start.mode.offsets = {
+                test_topic_source-0 = 50
+             }
+  }
+}
+
+transform {
+}
+
+sink  {
+       Console {}
+       Assert {
+             rules = {
+                   field_rules = [
+        	      	   {
+                         field_name = id
+                         field_type = long
+                         field_value = [
+                             {
+                                 rule_type = MIN
+                                 rule_value = 50
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                   ]
+             }
+       }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
new file mode 100644
index 000000000..2f55a8005
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq/rocketmq_source_timestamp_to_console.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_source"
+    result_table_name = "rocketmq_table"
+    # The default format is json, which is optional
+    format = json
+    start.mode = "CONSUME_FROM_TIMESTAMP"
+    schema = {
+       fields {
+            id = bigint
+       }
+     }
+     start.mode.timestamp = 1667179890315
+  }
+}
+
+transform {
+}
+
+sink  {
+       Console {}
+       Assert {
+            rules =
+                {
+                   field_rules = [
+        	      	   {
+                         field_name = id
+                         field_type = long
+                         field_value = [
+
+                             {
+                                 rule_type = MIN
+                                 rule_value = 0
+                             },
+                             {
+                                 rule_type = MAX
+                                 rule_value = 99
+                             }
+                         ]
+                      }
+                   ]
+                }
+            }
+      }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index a0e839d77..bce102766 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -53,6 +53,7 @@
         <module>connector-mongodb-e2e</module>
         <module>connector-hbase-e2e</module>
         <module>connector-maxcompute-e2e</module>
+        <module>connector-rocketmq-e2e</module>
     </modules>
 
     <dependencies>