You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/04/15 05:55:30 UTC

[rocketmq-connect] branch master updated: [ISSUE #11] Add mns source task (#1)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 07aa6e6  [ISSUE #11] Add mns source task (#1)
07aa6e6 is described below

commit 07aa6e6d613164840dd329a08af1744c155d623e
Author: zhaohai <33...@users.noreply.github.com>
AuthorDate: Fri Apr 15 13:55:25 2022 +0800

    [ISSUE #11] Add mns source task (#1)
    
    * add mns task
    
    * add log
    
    * update log4j
    
    * add constant
    
    * update project structure
    
    * update pom
    
    * update log4j
    
    * add description
    
    * delete files
    
    * add validate queue name and duplicate code
    
    * delete cloud events
    
    * update group path
    
    * add body transform
    
    * update pom
    
    * delete param
    
    * delete param
    
    * add put test
---
 connectors/aliyun/rocketmq-connect-mns/README.md   |  48 +++++
 connectors/aliyun/rocketmq-connect-mns/pom.xml     | 194 +++++++++++++++++++++
 .../mns/source/AbstractMNSRecordConvert.java       |  46 +++++
 .../connect/mns/source/MNSRecordConverImpl.java    |  47 +++++
 .../connect/mns/source/MNSSourceConnector.java     |  94 ++++++++++
 .../rocketmq/connect/mns/source/MNSSourceTask.java | 155 ++++++++++++++++
 .../connect/mns/source/constant/MNSConstant.java   |  12 ++
 .../connect/mns/source/utils/AliyunMnsUtil.java    |  21 +++
 .../connect/mns/source/MNSSourceConnectorTest.java |  53 ++++++
 9 files changed, 670 insertions(+)

diff --git a/connectors/aliyun/rocketmq-connect-mns/README.md b/connectors/aliyun/rocketmq-connect-mns/README.md
new file mode 100644
index 0000000..8c2effd
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/README.md
@@ -0,0 +1,48 @@
+# rocketmq-connect-mns
+* **rocketmq-connect-mns** 说明
+```
+It is responsible for obtaining data from the message service MNS and sending it to rocketmq through producer.
+```
+
+## rocketmq-connect-mns 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-mns 启动
+
+* **mns-source-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-source-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"}
+```
+
+例子
+
+```
+http://localhost:8081/connectors/mnsConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx",
+"accountId":"xxxx","batchSize":"8","isBase64Decode":"true"}
+```
+
+>**注:** `rocketmq-mns-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-mns 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-connector-name}/stop
+```
+
+## rocketmq-connect-mns 参数说明
+* **mns-source-connector 参数说明**
+
+| KEY             | TYPE    | Must be filled | Description             | Example  
+|-----------------|---------|----------------|-------------------------|----------|
+| accessKeyId     | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxxx     |
+| accessKeySecret | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxxx     |
+| accountEndpoint | String  | YES            | 阿里云MNS官方接入点             | xxxx     |
+| queueName       | String  | YES            | 队列名称                    | xxxx     |
+| accountId       | String  | YES            | 阿里云yourAccountId        | 10000000 |
+| batchSize       | Integer | NO            | 批量接受消息数量                | 8        |
+| isBase64Decode  | String  | NO             | 是否开启Base64解码            | true     |
diff --git a/connectors/aliyun/rocketmq-connect-mns/pom.xml b/connectors/aliyun/rocketmq-connect-mns/pom.xml
new file mode 100644
index 0000000..c41c53a
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/pom.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-mns</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-mns</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <slf4j.version>1.7.7</slf4j.version>
+        <logback.version>1.0.13</logback.version>
+        <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
+        <aliyun-sdk-mns.version>1.1.9.1</aliyun-sdk-mns.version>
+        <gson.version>2.9.0</gson.version>
+        <junit.version>4.12</junit.version>
+        <assertj.version>3.22.0</assertj.version>
+        <mockito.version>4.0.0</mockito.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.mns.source.MNSSourceConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.aliyun.mns</groupId>
+            <artifactId>aliyun-sdk-mns</artifactId>
+            <version>${aliyun-sdk-mns.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/AbstractMNSRecordConvert.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/AbstractMNSRecordConvert.java
new file mode 100644
index 0000000..cd0c0de
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/AbstractMNSRecordConvert.java
@@ -0,0 +1,46 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import com.aliyun.mns.model.Message;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractMNSRecordConvert {
+
+    protected static final String KEY_QUEUE_NAME = "MNS:QueueName";
+    protected static final String KEY_RECEIPT_HANDLE = "ReceiptHandle";
+    protected static final String KEY_MESSAGE_ID = "MessageId";
+
+    protected static final String ACS_MNS= "acs:mns";
+    protected static final String MNS_QUEUE_SEND_MESSAGE = "mns:Queue:SendMessage";
+
+    protected List<ConnectRecord> toConnectRecord(String regionId, String accountId, String queueName, Message popMsg, boolean isBase64Secode) {
+        RecordPosition recordPosition = buildRecordPosition(queueName, popMsg);
+        ConnectRecord connectRecord = new ConnectRecord(recordPosition.getPartition(), recordPosition.getOffset(), popMsg.getEnqueueTime().getTime());
+        connectRecord.addExtension(KEY_RECEIPT_HANDLE, popMsg.getReceiptHandle());
+        connectRecord.addExtension(KEY_MESSAGE_ID, popMsg.getMessageId());
+        List<ConnectRecord> connectRecordList = new ArrayList<>();
+        connectRecordList.add(connectRecord);
+        fillCloudEventsKey(connectRecord, regionId, accountId, queueName, popMsg, isBase64Secode);
+        return connectRecordList;
+    }
+
+    protected abstract void fillCloudEventsKey(ConnectRecord connectRecord, String regionId, String accountId, String queueName, Message popMsg, boolean isBase64Secode);
+
+    protected static RecordPosition buildRecordPosition(String queueName, Message popMsg) {
+        Map<String, String> sourcePartiton = new HashMap<>();
+        sourcePartiton.put(KEY_QUEUE_NAME, queueName);
+        Map<String, String> sourceOffset = new HashMap<>();
+        sourceOffset.put(KEY_RECEIPT_HANDLE, popMsg.getReceiptHandle());
+        RecordPartition recordPartition = new RecordPartition(sourcePartiton);
+        RecordOffset recordOffset = new RecordOffset(sourceOffset);
+        return new RecordPosition(recordPartition, recordOffset);
+    }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSRecordConverImpl.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSRecordConverImpl.java
new file mode 100644
index 0000000..078860f
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSRecordConverImpl.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import com.aliyun.mns.model.Message;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MNSRecordConverImpl extends AbstractMNSRecordConvert {
+
+
+    @Override
+    public void fillCloudEventsKey(ConnectRecord connectRecord, String regionId, String accountId, String queueName, Message popMsg, boolean isBase64Secode) {
+        String messageBodyValue = "";
+        if (isBase64Secode) {
+            messageBodyValue = new String(popMsg.getMessageBodyAsBytes(), StandardCharsets.UTF_8);
+        } else {
+            messageBodyValue = new String(popMsg.getMessageBodyAsRawBytes(), StandardCharsets.UTF_8);
+        }
+        JsonElement messageBody = null;
+        try {
+            messageBody = parseToJsonElement(messageBodyValue);
+        } catch (Exception e) {
+            messageBody = new JsonPrimitive(messageBodyValue);
+        }
+        Map<String, Object> mnsDataMap = new HashMap<>();
+        mnsDataMap.put("requestId", popMsg.getRequestId());
+        mnsDataMap.put("messageId", popMsg.getMessageId());
+        mnsDataMap.put("messageBody", messageBody);
+        connectRecord.setData(new Gson().toJson(mnsDataMap).getBytes(StandardCharsets.UTF_8));
+        connectRecord.setSchema(SchemaBuilder.bytes().build());
+    }
+
+    private JsonElement parseToJsonElement(String messageBodyValue) {
+        try {
+            return JsonParser.parseString(messageBodyValue);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnector.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnector.java
new file mode 100644
index 0000000..40e095b
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnector.java
@@ -0,0 +1,94 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.rocketmq.connect.mns.source.constant.MNSConstant.*;
+
+public class MNSSourceConnector extends SourceConnector {
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String queueName;
+
+    private String accountId;
+
+    private String isBase64Decode;
+
+    private Integer batchSize;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> taskConfigList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(ACCESS_KEY_ID, accessKeyId);
+        keyValue.put(ACCESS_KEY_SECRET, accessKeySecret);
+        keyValue.put(ACCOUNT_ENDPOINT, accountEndpoint);
+        keyValue.put(QUEUE_NAME, queueName);
+        keyValue.put(ACCOUNT_ID, accountId);
+        if (batchSize == null) {
+            keyValue.put(BATCH_SIZE, 8);
+        }
+        keyValue.put(IS_BASE64_DECODE, isBase64Decode);
+        taskConfigList.add(keyValue);
+        return taskConfigList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MNSSourceTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(ACCESS_KEY_ID))
+                || StringUtils.isBlank(config.getString(ACCESS_KEY_SECRET))
+                || StringUtils.isBlank(config.getString(ACCOUNT_ENDPOINT))
+                || StringUtils.isBlank(config.getString(QUEUE_NAME))
+                || StringUtils.isBlank(config.getString(ACCOUNT_ID))) {
+            throw new RuntimeException("mns required parameter is null !");
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(ACCESS_KEY_ID);
+        accessKeySecret = config.getString(ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(ACCOUNT_ENDPOINT);
+        queueName = config.getString(QUEUE_NAME);
+        batchSize = config.getInt(BATCH_SIZE, 8);
+        accountId = config.getString(ACCOUNT_ID);
+        isBase64Decode = config.getString(IS_BASE64_DECODE, "true");
+    }
+
+    @Override
+    public void start(ConnectorContext connectorContext) {
+        super.start(connectorContext);
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
new file mode 100644
index 0000000..ce4a588
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
@@ -0,0 +1,155 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import com.aliyun.mns.client.CloudAccount;
+import com.aliyun.mns.client.CloudQueue;
+import com.aliyun.mns.client.MNSClient;
+import com.aliyun.mns.common.ClientException;
+import com.aliyun.mns.common.ServiceException;
+import com.aliyun.mns.model.Message;
+import com.aliyun.mns.model.PagingListResult;
+import com.aliyun.mns.model.QueueMeta;
+import org.apache.rocketmq.connect.mns.source.utils.AliyunMnsUtil;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.rocketmq.connect.mns.source.constant.MNSConstant.*;
+
+public class MNSSourceTask extends SourceTask {
+    private static final Logger log = LoggerFactory.getLogger(MNSSourceTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String queueName;
+
+    private String accountId;
+
+    private String isBase64Decode;
+
+    private MNSClient mnsClient;
+
+    private CloudQueue cloudQueue;
+
+    private Integer batchSize;
+
+    private AbstractMNSRecordConvert abstractMNSRecordConvert;
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        List<ConnectRecord> result = new ArrayList<>(11);
+        try{
+            List<Message> messageList = cloudQueue.batchPopMessage(batchSize);
+            if (messageList != null && !messageList.isEmpty()) {
+                messageList.forEach(message -> {
+                    result.addAll(this.abstractMNSRecordConvert.toConnectRecord(
+                            AliyunMnsUtil.parseRegionIdFromEndpoint(accountEndpoint), accountId, queueName, message, Boolean.parseBoolean(isBase64Decode)));
+                });
+            }
+        } catch (ClientException ce) {
+            log.error("Something wrong with the network connection between client and MNS service."
+                    + "Please check your network and DNS availability.", ce);
+        } catch (ServiceException se) {
+            log.error("MNS exception requestId: " + se.getRequestId(), se);
+            if (se.getErrorCode().equals("QueueNotExist")) {
+                log.error("Queue is not exist.Please create before use");
+            } else if (se.getErrorCode().equals("TimeExpired")) {
+                log.error("The request is time expired. Please check your local machine timeclock");
+            }
+        } catch (Exception e) {
+            log.error("Unknown exception happened! ", e);
+        }
+        return result;
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(ACCESS_KEY_ID))
+                || StringUtils.isBlank(config.getString(ACCESS_KEY_SECRET))
+                || StringUtils.isBlank(config.getString(ACCOUNT_ENDPOINT))
+                || StringUtils.isBlank(config.getString(QUEUE_NAME))
+                || StringUtils.isBlank(config.getString(ACCOUNT_ID))) {
+            throw new RuntimeException("mns required parameter is null !");
+        }
+        // 检测队列名称是否存在
+        PagingListResult<QueueMeta> queueMetaPagingListResult = mnsClient.listQueue(queueName, null, 1);
+        List<QueueMeta> result = queueMetaPagingListResult.getResult();
+        if (result == null || result.isEmpty()) {
+            throw new RuntimeException("mns queue name parameter is null !");
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(ACCESS_KEY_ID);
+        accessKeySecret = config.getString(ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(ACCOUNT_ENDPOINT);
+        queueName = config.getString(QUEUE_NAME);
+        batchSize = config.getInt(BATCH_SIZE, 8);
+        accountId = config.getString(ACCOUNT_ID);
+        isBase64Decode = config.getString(IS_BASE64_DECODE, "true");
+        abstractMNSRecordConvert = new MNSRecordConverImpl();
+    }
+
+    @Override
+    public void commit(final List<ConnectRecord> connectRecords) throws InterruptedException {
+        if (connectRecords == null || connectRecords.isEmpty()) {
+            return;
+        }
+        Set<String> receiptHandlesSet = new HashSet<>(connectRecords.size());
+        try {
+            connectRecords.forEach(connectRecord -> receiptHandlesSet.add(connectRecord.getExtension(AbstractMNSRecordConvert.KEY_RECEIPT_HANDLE)));
+            List<String> receiptHandles = new ArrayList<>(receiptHandlesSet.size());
+            receiptHandles.addAll(receiptHandlesSet);
+            cloudQueue.batchDeleteMessage(receiptHandles);
+        } catch (Exception e) {
+            log.error("MNSSourceTask | commit | error => ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void commit() {
+        super.commit();
+    }
+
+    @Override
+    public void start(SourceTaskContext sourceTaskContext) {
+        super.start(sourceTaskContext);
+        try {
+            CloudAccount cloudAccount = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
+            mnsClient = cloudAccount.getMNSClient();
+            cloudQueue = mnsClient.getQueueRef(queueName);
+        } catch (Exception e) {
+            log.error("MNSSourceTask | start | error => ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        mnsClient.close();
+    }
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/constant/MNSConstant.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/constant/MNSConstant.java
new file mode 100644
index 0000000..f8ff6d3
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/constant/MNSConstant.java
@@ -0,0 +1,12 @@
+package org.apache.rocketmq.connect.mns.source.constant;
+
+public class MNSConstant {
+
+    public static final String ACCESS_KEY_ID = "accessKeyId";
+    public static final String ACCESS_KEY_SECRET = "accessKeySecret";
+    public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
+    public static final String QUEUE_NAME = "queueName";
+    public static final String ACCOUNT_ID = "accountId";
+    public static final String BATCH_SIZE = "batchSize";
+    public static final String IS_BASE64_DECODE = "isBase64Decode";
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/utils/AliyunMnsUtil.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/utils/AliyunMnsUtil.java
new file mode 100644
index 0000000..aae3008
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/utils/AliyunMnsUtil.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.connect.mns.source.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class AliyunMnsUtil {
+
+    private static final int ARRAY_LENGTH = 3;
+
+    public static String parseRegionIdFromEndpoint(String endpoint) {
+        String regionId = null;
+        if (StringUtils.isBlank(endpoint)) {
+            return regionId;
+        }
+        String[] split = endpoint.split("\\.");
+        if (split.length >= ARRAY_LENGTH) {
+            regionId = split[2];
+        }
+        return regionId;
+    }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/test/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnectorTest.java b/connectors/aliyun/rocketmq-connect-mns/src/test/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnectorTest.java
new file mode 100644
index 0000000..acfb016
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/test/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnectorTest.java
@@ -0,0 +1,53 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.mns.source.constant.MNSConstant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+
+public class MNSSourceConnectorTest {
+
+    private final MNSSourceConnector mnsSourceConnector = new MNSSourceConnector();
+
+    @Test
+    public void testTaskConfigs() {
+        Assert.assertEquals(mnsSourceConnector.taskConfigs(1).size(), 1);
+    }
+
+    @Test
+    public void testPut() throws InterruptedException {
+        MNSSourceTask mnsSourceTask = new MNSSourceTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(MNSConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(MNSConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(MNSConstant.ACCOUNT_ENDPOINT, "xxxx");
+        keyValue.put(MNSConstant.ACCOUNT_ID, "xxxx");
+        keyValue.put(MNSConstant.QUEUE_NAME, "xxxx");
+        mnsSourceTask.init(keyValue);
+        mnsSourceTask.start(new SourceTaskContext() {
+            @Override
+            public OffsetStorageReader offsetStorageReader() {
+                return null;
+            }
+
+            @Override
+            public String getConnectorName() {
+                return null;
+            }
+
+            @Override
+            public String getTaskName() {
+                return null;
+            }
+        });
+        List<ConnectRecord> poll = mnsSourceTask.poll();
+        mnsSourceTask.commit(poll);
+    }
+}