You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/04/15 05:55:30 UTC
[rocketmq-connect] branch master updated: [ISSUE #11] Add mns source task (#1)
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 07aa6e6 [ISSUE #11] Add mns source task (#1)
07aa6e6 is described below
commit 07aa6e6d613164840dd329a08af1744c155d623e
Author: zhaohai <33...@users.noreply.github.com>
AuthorDate: Fri Apr 15 13:55:25 2022 +0800
[ISSUE #11] Add mns source task (#1)
* add mns task
* add log
* update log4j
* add constant
* update project structure
* update pom
* update log4j
* add description
* delete files
* add validate queue name and duplicate code
* delete cloud events
* update group path
* add body transform
* update pom
* delete param
* delete param
* add put test
---
connectors/aliyun/rocketmq-connect-mns/README.md | 48 +++++
connectors/aliyun/rocketmq-connect-mns/pom.xml | 194 +++++++++++++++++++++
.../mns/source/AbstractMNSRecordConvert.java | 46 +++++
.../connect/mns/source/MNSRecordConverImpl.java | 47 +++++
.../connect/mns/source/MNSSourceConnector.java | 94 ++++++++++
.../rocketmq/connect/mns/source/MNSSourceTask.java | 155 ++++++++++++++++
.../connect/mns/source/constant/MNSConstant.java | 12 ++
.../connect/mns/source/utils/AliyunMnsUtil.java | 21 +++
.../connect/mns/source/MNSSourceConnectorTest.java | 53 ++++++
9 files changed, 670 insertions(+)
diff --git a/connectors/aliyun/rocketmq-connect-mns/README.md b/connectors/aliyun/rocketmq-connect-mns/README.md
new file mode 100644
index 0000000..8c2effd
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/README.md
@@ -0,0 +1,48 @@
+# rocketmq-connect-mns
+* **rocketmq-connect-mns** 说明
+```
+It is responsible for obtaining data from the message service MNS and sending it to rocketmq through producer.
+```
+
+## rocketmq-connect-mns 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-mns 启动
+
+* **mns-source-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-source-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"}
+```
+
+例子
+
+```
+http://localhost:8081/connectors/mnsConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx",
+"accountId":"xxxx","batchSize":"8","isBase64Decode":"true"}
+```
+
+>**注:** `rocketmq-mns-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-mns 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-connector-name}/stop
+```
+
+## rocketmq-connect-mns 参数说明
+* **mns-source-connector 参数说明**
+
+| KEY | TYPE | Must be filled | Description | Example
+|-----------------|---------|----------------|-------------------------|----------|
+| accessKeyId | String | YES | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxxx |
+| accessKeySecret | String | YES | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxxx |
+| accountEndpoint | String | YES | 阿里云MNS官方接入点 | xxxx |
+| queueName | String | YES | 队列名称 | xxxx |
+| accountId | String | YES | 阿里云yourAccountId | 10000000 |
+| batchSize | Integer | NO | 批量接受消息数量 | 8 |
+| isBase64Decode | String | NO | 是否开启Base64解码 | true |
diff --git a/connectors/aliyun/rocketmq-connect-mns/pom.xml b/connectors/aliyun/rocketmq-connect-mns/pom.xml
new file mode 100644
index 0000000..c41c53a
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/pom.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-mns</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <name>connect-mns</name>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <slf4j.version>1.7.7</slf4j.version>
+ <logback.version>1.0.13</logback.version>
+ <openmessaging-connector.version>0.1.2-SNAPSHOT</openmessaging-connector.version>
+ <aliyun-sdk-mns.version>1.1.9.1</aliyun-sdk-mns.version>
+ <gson.version>2.9.0</gson.version>
+ <junit.version>4.12</junit.version>
+ <assertj.version>3.22.0</assertj.version>
+ <mockito.version>4.0.0</mockito.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.rocketmq.connect.mns.source.MNSSourceConnector</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.aliyun.mns</groupId>
+ <artifactId>aliyun-sdk-mns</artifactId>
+ <version>${aliyun-sdk-mns.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>${openmessaging-connector.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/AbstractMNSRecordConvert.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/AbstractMNSRecordConvert.java
new file mode 100644
index 0000000..cd0c0de
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/AbstractMNSRecordConvert.java
@@ -0,0 +1,46 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import com.aliyun.mns.model.Message;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.RecordPosition;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractMNSRecordConvert {
+
+ protected static final String KEY_QUEUE_NAME = "MNS:QueueName";
+ protected static final String KEY_RECEIPT_HANDLE = "ReceiptHandle";
+ protected static final String KEY_MESSAGE_ID = "MessageId";
+
+ protected static final String ACS_MNS= "acs:mns";
+ protected static final String MNS_QUEUE_SEND_MESSAGE = "mns:Queue:SendMessage";
+
+ protected List<ConnectRecord> toConnectRecord(String regionId, String accountId, String queueName, Message popMsg, boolean isBase64Secode) {
+ RecordPosition recordPosition = buildRecordPosition(queueName, popMsg);
+ ConnectRecord connectRecord = new ConnectRecord(recordPosition.getPartition(), recordPosition.getOffset(), popMsg.getEnqueueTime().getTime());
+ connectRecord.addExtension(KEY_RECEIPT_HANDLE, popMsg.getReceiptHandle());
+ connectRecord.addExtension(KEY_MESSAGE_ID, popMsg.getMessageId());
+ List<ConnectRecord> connectRecordList = new ArrayList<>();
+ connectRecordList.add(connectRecord);
+ fillCloudEventsKey(connectRecord, regionId, accountId, queueName, popMsg, isBase64Secode);
+ return connectRecordList;
+ }
+
+ protected abstract void fillCloudEventsKey(ConnectRecord connectRecord, String regionId, String accountId, String queueName, Message popMsg, boolean isBase64Secode);
+
+ protected static RecordPosition buildRecordPosition(String queueName, Message popMsg) {
+ Map<String, String> sourcePartiton = new HashMap<>();
+ sourcePartiton.put(KEY_QUEUE_NAME, queueName);
+ Map<String, String> sourceOffset = new HashMap<>();
+ sourceOffset.put(KEY_RECEIPT_HANDLE, popMsg.getReceiptHandle());
+ RecordPartition recordPartition = new RecordPartition(sourcePartiton);
+ RecordOffset recordOffset = new RecordOffset(sourceOffset);
+ return new RecordPosition(recordPartition, recordOffset);
+ }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSRecordConverImpl.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSRecordConverImpl.java
new file mode 100644
index 0000000..078860f
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSRecordConverImpl.java
@@ -0,0 +1,47 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import com.aliyun.mns.model.Message;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MNSRecordConverImpl extends AbstractMNSRecordConvert {
+
+
+ @Override
+ public void fillCloudEventsKey(ConnectRecord connectRecord, String regionId, String accountId, String queueName, Message popMsg, boolean isBase64Secode) {
+ String messageBodyValue = "";
+ if (isBase64Secode) {
+ messageBodyValue = new String(popMsg.getMessageBodyAsBytes(), StandardCharsets.UTF_8);
+ } else {
+ messageBodyValue = new String(popMsg.getMessageBodyAsRawBytes(), StandardCharsets.UTF_8);
+ }
+ JsonElement messageBody = null;
+ try {
+ messageBody = parseToJsonElement(messageBodyValue);
+ } catch (Exception e) {
+ messageBody = new JsonPrimitive(messageBodyValue);
+ }
+ Map<String, Object> mnsDataMap = new HashMap<>();
+ mnsDataMap.put("requestId", popMsg.getRequestId());
+ mnsDataMap.put("messageId", popMsg.getMessageId());
+ mnsDataMap.put("messageBody", messageBody);
+ connectRecord.setData(new Gson().toJson(mnsDataMap).getBytes(StandardCharsets.UTF_8));
+ connectRecord.setSchema(SchemaBuilder.bytes().build());
+ }
+
+ private JsonElement parseToJsonElement(String messageBodyValue) {
+ try {
+ return JsonParser.parseString(messageBodyValue);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnector.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnector.java
new file mode 100644
index 0000000..40e095b
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnector.java
@@ -0,0 +1,94 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.rocketmq.connect.mns.source.constant.MNSConstant.*;
+
+public class MNSSourceConnector extends SourceConnector {
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String accountEndpoint;
+
+ private String queueName;
+
+ private String accountId;
+
+ private String isBase64Decode;
+
+ private Integer batchSize;
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> taskConfigList = new ArrayList<>(11);
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(ACCESS_KEY_ID, accessKeyId);
+ keyValue.put(ACCESS_KEY_SECRET, accessKeySecret);
+ keyValue.put(ACCOUNT_ENDPOINT, accountEndpoint);
+ keyValue.put(QUEUE_NAME, queueName);
+ keyValue.put(ACCOUNT_ID, accountId);
+ if (batchSize == null) {
+ keyValue.put(BATCH_SIZE, 8);
+ }
+ keyValue.put(IS_BASE64_DECODE, isBase64Decode);
+ taskConfigList.add(keyValue);
+ return taskConfigList;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return MNSSourceTask.class;
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ if (StringUtils.isBlank(config.getString(ACCESS_KEY_ID))
+ || StringUtils.isBlank(config.getString(ACCESS_KEY_SECRET))
+ || StringUtils.isBlank(config.getString(ACCOUNT_ENDPOINT))
+ || StringUtils.isBlank(config.getString(QUEUE_NAME))
+ || StringUtils.isBlank(config.getString(ACCOUNT_ID))) {
+ throw new RuntimeException("mns required parameter is null !");
+ }
+ }
+
+ @Override
+ public void init(KeyValue config) {
+ accessKeyId = config.getString(ACCESS_KEY_ID);
+ accessKeySecret = config.getString(ACCESS_KEY_SECRET);
+ accountEndpoint = config.getString(ACCOUNT_ENDPOINT);
+ queueName = config.getString(QUEUE_NAME);
+ batchSize = config.getInt(BATCH_SIZE, 8);
+ accountId = config.getString(ACCOUNT_ID);
+ isBase64Decode = config.getString(IS_BASE64_DECODE, "true");
+ }
+
+ @Override
+ public void start(ConnectorContext connectorContext) {
+ super.start(connectorContext);
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
new file mode 100644
index 0000000..ce4a588
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
@@ -0,0 +1,155 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import com.aliyun.mns.client.CloudAccount;
+import com.aliyun.mns.client.CloudQueue;
+import com.aliyun.mns.client.MNSClient;
+import com.aliyun.mns.common.ClientException;
+import com.aliyun.mns.common.ServiceException;
+import com.aliyun.mns.model.Message;
+import com.aliyun.mns.model.PagingListResult;
+import com.aliyun.mns.model.QueueMeta;
+import org.apache.rocketmq.connect.mns.source.utils.AliyunMnsUtil;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.rocketmq.connect.mns.source.constant.MNSConstant.*;
+
+public class MNSSourceTask extends SourceTask {
+ private static final Logger log = LoggerFactory.getLogger(MNSSourceTask.class);
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String accountEndpoint;
+
+ private String queueName;
+
+ private String accountId;
+
+ private String isBase64Decode;
+
+ private MNSClient mnsClient;
+
+ private CloudQueue cloudQueue;
+
+ private Integer batchSize;
+
+ private AbstractMNSRecordConvert abstractMNSRecordConvert;
+
+ @Override
+ public List<ConnectRecord> poll() throws InterruptedException {
+ List<ConnectRecord> result = new ArrayList<>(11);
+ try{
+ List<Message> messageList = cloudQueue.batchPopMessage(batchSize);
+ if (messageList != null && !messageList.isEmpty()) {
+ messageList.forEach(message -> {
+ result.addAll(this.abstractMNSRecordConvert.toConnectRecord(
+ AliyunMnsUtil.parseRegionIdFromEndpoint(accountEndpoint), accountId, queueName, message, Boolean.parseBoolean(isBase64Decode)));
+ });
+ }
+ } catch (ClientException ce) {
+ log.error("Something wrong with the network connection between client and MNS service."
+ + "Please check your network and DNS availability.", ce);
+ } catch (ServiceException se) {
+ log.error("MNS exception requestId: " + se.getRequestId(), se);
+ if (se.getErrorCode().equals("QueueNotExist")) {
+ log.error("Queue is not exist.Please create before use");
+ } else if (se.getErrorCode().equals("TimeExpired")) {
+ log.error("The request is time expired. Please check your local machine timeclock");
+ }
+ } catch (Exception e) {
+ log.error("Unknown exception happened! ", e);
+ }
+ return result;
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ if (StringUtils.isBlank(config.getString(ACCESS_KEY_ID))
+ || StringUtils.isBlank(config.getString(ACCESS_KEY_SECRET))
+ || StringUtils.isBlank(config.getString(ACCOUNT_ENDPOINT))
+ || StringUtils.isBlank(config.getString(QUEUE_NAME))
+ || StringUtils.isBlank(config.getString(ACCOUNT_ID))) {
+ throw new RuntimeException("mns required parameter is null !");
+ }
+ // 检测队列名称是否存在
+ PagingListResult<QueueMeta> queueMetaPagingListResult = mnsClient.listQueue(queueName, null, 1);
+ List<QueueMeta> result = queueMetaPagingListResult.getResult();
+ if (result == null || result.isEmpty()) {
+ throw new RuntimeException("mns queue name parameter is null !");
+ }
+ }
+
+ @Override
+ public void init(KeyValue config) {
+ accessKeyId = config.getString(ACCESS_KEY_ID);
+ accessKeySecret = config.getString(ACCESS_KEY_SECRET);
+ accountEndpoint = config.getString(ACCOUNT_ENDPOINT);
+ queueName = config.getString(QUEUE_NAME);
+ batchSize = config.getInt(BATCH_SIZE, 8);
+ accountId = config.getString(ACCOUNT_ID);
+ isBase64Decode = config.getString(IS_BASE64_DECODE, "true");
+ abstractMNSRecordConvert = new MNSRecordConverImpl();
+ }
+
+ @Override
+ public void commit(final List<ConnectRecord> connectRecords) throws InterruptedException {
+ if (connectRecords == null || connectRecords.isEmpty()) {
+ return;
+ }
+ Set<String> receiptHandlesSet = new HashSet<>(connectRecords.size());
+ try {
+ connectRecords.forEach(connectRecord -> receiptHandlesSet.add(connectRecord.getExtension(AbstractMNSRecordConvert.KEY_RECEIPT_HANDLE)));
+ List<String> receiptHandles = new ArrayList<>(receiptHandlesSet.size());
+ receiptHandles.addAll(receiptHandlesSet);
+ cloudQueue.batchDeleteMessage(receiptHandles);
+ } catch (Exception e) {
+ log.error("MNSSourceTask | commit | error => ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void commit() {
+ super.commit();
+ }
+
+ @Override
+ public void start(SourceTaskContext sourceTaskContext) {
+ super.start(sourceTaskContext);
+ try {
+ CloudAccount cloudAccount = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
+ mnsClient = cloudAccount.getMNSClient();
+ cloudQueue = mnsClient.getQueueRef(queueName);
+ } catch (Exception e) {
+ log.error("MNSSourceTask | start | error => ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ mnsClient.close();
+ }
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/constant/MNSConstant.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/constant/MNSConstant.java
new file mode 100644
index 0000000..f8ff6d3
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/constant/MNSConstant.java
@@ -0,0 +1,12 @@
+package org.apache.rocketmq.connect.mns.source.constant;
+
+public class MNSConstant {
+
+ public static final String ACCESS_KEY_ID = "accessKeyId";
+ public static final String ACCESS_KEY_SECRET = "accessKeySecret";
+ public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
+ public static final String QUEUE_NAME = "queueName";
+ public static final String ACCOUNT_ID = "accountId";
+ public static final String BATCH_SIZE = "batchSize";
+ public static final String IS_BASE64_DECODE = "isBase64Decode";
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/utils/AliyunMnsUtil.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/utils/AliyunMnsUtil.java
new file mode 100644
index 0000000..aae3008
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/utils/AliyunMnsUtil.java
@@ -0,0 +1,21 @@
+package org.apache.rocketmq.connect.mns.source.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class AliyunMnsUtil {
+
+ private static final int ARRAY_LENGTH = 3;
+
+ public static String parseRegionIdFromEndpoint(String endpoint) {
+ String regionId = null;
+ if (StringUtils.isBlank(endpoint)) {
+ return regionId;
+ }
+ String[] split = endpoint.split("\\.");
+ if (split.length >= ARRAY_LENGTH) {
+ regionId = split[2];
+ }
+ return regionId;
+ }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/test/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnectorTest.java b/connectors/aliyun/rocketmq-connect-mns/src/test/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnectorTest.java
new file mode 100644
index 0000000..acfb016
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-mns/src/test/java/org/apache/rocketmq/connect/mns/source/MNSSourceConnectorTest.java
@@ -0,0 +1,53 @@
+package org.apache.rocketmq.connect.mns.source;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.storage.OffsetStorageReader;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.mns.source.constant.MNSConstant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+
+public class MNSSourceConnectorTest {
+
+ private final MNSSourceConnector mnsSourceConnector = new MNSSourceConnector();
+
+ @Test
+ public void testTaskConfigs() {
+ Assert.assertEquals(mnsSourceConnector.taskConfigs(1).size(), 1);
+ }
+
+ @Test
+ public void testPut() throws InterruptedException {
+ MNSSourceTask mnsSourceTask = new MNSSourceTask();
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(MNSConstant.ACCESS_KEY_ID, "xxxx");
+ keyValue.put(MNSConstant.ACCESS_KEY_SECRET, "xxxx");
+ keyValue.put(MNSConstant.ACCOUNT_ENDPOINT, "xxxx");
+ keyValue.put(MNSConstant.ACCOUNT_ID, "xxxx");
+ keyValue.put(MNSConstant.QUEUE_NAME, "xxxx");
+ mnsSourceTask.init(keyValue);
+ mnsSourceTask.start(new SourceTaskContext() {
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return null;
+ }
+
+ @Override
+ public String getConnectorName() {
+ return null;
+ }
+
+ @Override
+ public String getTaskName() {
+ return null;
+ }
+ });
+ List<ConnectRecord> poll = mnsSourceTask.poll();
+ mnsSourceTask.commit(poll);
+ }
+}