You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2022/06/07 07:06:10 UTC

[rocketmq-connect] branch master updated (ffa4cdb -> a110c5d)

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


    from ffa4cdb  update README.md
     new 395c3f3  Remove redundant verification, complete document example parameters, and optimize RocketMQ's key and tag acquisition logic.
     new a110c5d  add project.build.sourceEncoding

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../aliyun/rocketmq-connect-dingtalk/README.md     |  6 +-
 .../aliyun/rocketmq-connect-dingtalk/pom.xml       |  1 +
 .../connect/dingtalk/sink/DingTalkSinkTask.java    |  5 --
 connectors/aliyun/rocketmq-connect-fc/README.md    |  5 +-
 connectors/aliyun/rocketmq-connect-fc/pom.xml      |  1 +
 .../rocketmq/connect/fc/sink/FcSinkTask.java       |  8 ---
 connectors/aliyun/rocketmq-connect-mns/README.md   |  5 +-
 .../rocketmq/connect/mns/source/MNSSourceTask.java |  8 ---
 .../aliyun/rocketmq-connect-rocketmq/README.md     | 24 ++++----
 .../connect/rocketmq/RocketMQSinkConnector.java    | 25 ++++++++
 .../connect/rocketmq/RocketMQSinkTask.java         | 39 ++----------
 .../connect/rocketmq/RocketMQSourceConnector.java  | 34 +++++++++++
 .../connect/rocketmq/RocketMQSourceTask.java       | 70 ++++++----------------
 .../connect/rocketmq/common/RocketMQConstant.java  |  4 ++
 .../rocketmq/RocketMQSinkConnectorTest.java        |  3 +-
 .../rocketmq/RocketMQSourceConnectorTest.java      | 12 ++++
 connectors/rocketmq-connect-http/README.md         |  5 +-
 .../rocketmq/connect/http/sink/HttpSinkTask.java   |  8 +--
 18 files changed, 129 insertions(+), 134 deletions(-)


[rocketmq-connect] 01/02: Remove redundant verification, complete document example parameters, and optimize RocketMQ's key and tag acquisition logic.

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 395c3f325828bce0497fa5f54a385f071d02484d
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Fri Apr 22 14:04:13 2022 +0800

    Remove redundant verification, complete document example parameters, and optimize RocketMQ's key and tag acquisition logic.
---
 .../aliyun/rocketmq-connect-dingtalk/README.md     |  6 +-
 .../connect/dingtalk/sink/DingTalkSinkTask.java    |  5 --
 connectors/aliyun/rocketmq-connect-fc/README.md    |  5 +-
 .../rocketmq/connect/fc/sink/FcSinkTask.java       |  8 ---
 connectors/aliyun/rocketmq-connect-mns/README.md   |  5 +-
 .../rocketmq/connect/mns/source/MNSSourceTask.java |  8 ---
 .../aliyun/rocketmq-connect-rocketmq/README.md     | 24 ++++----
 .../connect/rocketmq/RocketMQSinkConnector.java    | 25 ++++++++
 .../connect/rocketmq/RocketMQSinkTask.java         | 39 ++----------
 .../connect/rocketmq/RocketMQSourceConnector.java  | 34 +++++++++++
 .../connect/rocketmq/RocketMQSourceTask.java       | 70 ++++++----------------
 .../connect/rocketmq/common/RocketMQConstant.java  |  4 ++
 .../rocketmq/RocketMQSinkConnectorTest.java        |  3 +-
 .../rocketmq/RocketMQSourceConnectorTest.java      | 12 ++++
 connectors/rocketmq-connect-http/README.md         |  5 +-
 .../rocketmq/connect/http/sink/HttpSinkTask.java   |  8 +--
 16 files changed, 127 insertions(+), 134 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-dingtalk/README.md b/connectors/aliyun/rocketmq-connect-dingtalk/README.md
index 127036a..2ab5385 100644
--- a/connectors/aliyun/rocketmq-connect-dingtalk/README.md
+++ b/connectors/aliyun/rocketmq-connect-dingtalk/README.md
@@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-ding-talk-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector",“webHook”:"${webHook}",“msgtype”:"${msgtype}","secretKey":"${secretKey}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","connect-topicname" : "${connect-topicname}","webHook":"${webHook}","msgtype":"${msgtype}","secretKey":"${secretKey}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/dingTalkConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","webHook":"192.168.1.2","msgtype":"text","secretKey":"xxxx"}
+"connector-class":"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkConnector","connect-topicname" : "ding-talk-topic","webHook":"192.168.1.2","msgtype":"text","secretKey":"xxxx"}
 ```
 
 >**注:** `rocketmq-ding-talk-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -40,4 +40,4 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-jdbc-connector-name}/
 |webHook                 | String  | YES            | 机器人的Webhook地址 | https://oapi.dingtalk.com/robot/send?access_token=XXXXXX |
 |msgtype                 | String  | NO             | 消息类型      | text                                                     |             |
 |secretKey               | String  | NO             | 密钥        | SC                                                       |
-
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java b/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
index cb4e158..98e52fb 100644
--- a/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-dingtalk/src/main/java/org/apache/rocketmq/connect/dingtalk/sink/DingTalkSinkTask.java
@@ -7,7 +7,6 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,10 +56,6 @@ public class DingTalkSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(DingTalkConstant.WEB_HOOK)) ||
-            StringUtils.isBlank(config.getString(DingTalkConstant.SECRET_KEY))) {
-            throw new RuntimeException("ding talk required parameter is null !");
-        }
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-fc/README.md b/connectors/aliyun/rocketmq-connect-fc/README.md
index b22979e..69dc110 100644
--- a/connectors/aliyun/rocketmq-connect-fc/README.md
+++ b/connectors/aliyun/rocketmq-connect-fc/README.md
@@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}", "qualifier":"${qualifier}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector","connect-topicname" : "${connect-topicname}",“region”:"${region}",accessKey”:"${accessKey}",accessSecretKey”:"${accessSecretKey}",accountId”:"${accountId}","serviceName":"${serviceName}","functionName":"${functionName}","invocationType":"${invocationType}", "qualifier":"${qualifier}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/fcConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"", "qualifier":"LATEST"}
+"connector-class":"org.apache.rocketmq.connect.fc.sink.FcSinkConnector","connect-topicname" : "fc-topic",“region”:"cn-hangzhou",accessKey”:"xxxx",accessSecretKey”:"xxxx",accountId”:"xxxx","serviceName":"xxxx","functionName":"xxxx","invocationType":"", "qualifier":"LATEST"}
 ```
 
 >**注:** `rocketmq-fc-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -45,4 +45,5 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-fc-connector-name}/st
 |functionName            | String  | YES            | 函数名称 | xxxx |
 |invocationType          | String | NO             | 同步或者异步                           | null |
 |qualifier               | String | NO             | 服务版本和别名                          | LATEST |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java b/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java
index 8080b82..a56739f 100644
--- a/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-fc/src/main/java/org/apache/rocketmq/connect/fc/sink/FcSinkTask.java
@@ -80,14 +80,6 @@ public class FcSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(FcConstant.REGION_ID_CONSTANT))
-            || StringUtils.isBlank(config.getString(FcConstant.ACCESS_KEY_ID_CONSTANT))
-            || StringUtils.isBlank(config.getString(FcConstant.ACCESS__KEY_SECRET_CONSTANT))
-            || StringUtils.isBlank(config.getString(FcConstant.ACCOUNT_ID_CONSTANT))
-            || StringUtils.isBlank(config.getString(FcConstant.SERVICE_NAME_CONSTANT))
-            || StringUtils.isBlank(config.getString(FcConstant.FUNCTION_NAME_CONSTANT))) {
-            throw new RuntimeException("fc required parameter is null !");
-        }
         try {
             GetServiceRequest getServiceRequest = new GetServiceRequest(config.getString(FcConstant.SERVICE_NAME_CONSTANT));
             getServiceRequest.setQualifier(config.getString(FcConstant.QUALIFIER_CONSTANT));
diff --git a/connectors/aliyun/rocketmq-connect-mns/README.md b/connectors/aliyun/rocketmq-connect-mns/README.md
index 8c2effd..bc143b6 100644
--- a/connectors/aliyun/rocketmq-connect-mns/README.md
+++ b/connectors/aliyun/rocketmq-connect-mns/README.md
@@ -15,14 +15,14 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-source-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","connect-topicname" : "${connect-topicname}",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}",queueName”:"${queueName}","accountId":"${accountId}","batchSize":"${batchSize}","isBase64Decode":"${isBase64Decode}"}
 ```
 
 例子
 
 ```
 http://localhost:8081/connectors/mnsConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx",
+"connector-class":"org.apache.rocketmq.connect.mns.source.MNSSourceConnector","connect-topicname" : "mns-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","accountEndpoint":"xxxx","queueName":"xxxx",
 "accountId":"xxxx","batchSize":"8","isBase64Decode":"true"}
 ```
 
@@ -46,3 +46,4 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-mns-connector-name}/s
 | accountId       | String  | YES            | 阿里云yourAccountId        | 10000000 |
 | batchSize       | Integer | NO            | 批量接受消息数量                | 8        |
 | isBase64Decode  | String  | NO             | 是否开启Base64解码            | true     |
+|connect-topicname       | String  | YES            | source需要处理数据消息topic     | xxxx |
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
index ce4a588..d54092c 100644
--- a/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
+++ b/connectors/aliyun/rocketmq-connect-mns/src/main/java/org/apache/rocketmq/connect/mns/source/MNSSourceTask.java
@@ -13,7 +13,6 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,13 +85,6 @@ public class MNSSourceTask extends SourceTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(ACCESS_KEY_ID))
-                || StringUtils.isBlank(config.getString(ACCESS_KEY_SECRET))
-                || StringUtils.isBlank(config.getString(ACCOUNT_ENDPOINT))
-                || StringUtils.isBlank(config.getString(QUEUE_NAME))
-                || StringUtils.isBlank(config.getString(ACCOUNT_ID))) {
-            throw new RuntimeException("mns required parameter is null !");
-        }
         // 检测队列名称是否存在
         PagingListResult<QueueMeta> queueMetaPagingListResult = mnsClient.listQueue(queueName, null, 1);
         List<QueueMeta> result = queueMetaPagingListResult.getResult();
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/README.md b/connectors/aliyun/rocketmq-connect-rocketmq/README.md
index cb41006..025a044 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/README.md
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/README.md
@@ -11,14 +11,14 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-source-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","connect-topicname" : "${connect-topicname}",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}","consumerGroup":"${consumerGroup}"}
 ```
 
 例子
 
 ```
 http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic",
+"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector","connect-topicname" : "rocketmq-source-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic",
 "instanceId":"xxxx", "consumerGroup":"xxxx"}
 ```
 
@@ -26,13 +26,13 @@ http://localhost:8081/connectors/rocketmqConnectorSource?config={"source-rocketm
 
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector",“accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","connect-topicname" : "${connect-topicname}", "accessKeyId":"${accessKeyId}", "accessKeySecret":"${accessKeySecret}",namesrvAddr”:"${namesrvAddr}","topic":"${topic}","instanceId":"${instanceId}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/rocketmqConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic",
+"connector-class":"org.apache.rocketmq.connect.rocketmq.RocketMQSinkConnector","connect-topicname" : "rocketmq-sink-topic","accessKeyId":"xxxx","accessKeySecret":"xxxx","namesrvAddr":"http://127.0.0.1:9876","topic":"topic",
 "instanceId":"xxxx"}
 ```
 
@@ -47,14 +47,15 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-na
 ## rocketmq-connect-rocketmq 参数说明
 * **rocketmq-source-connector 参数说明**
 
-|         KEY            |  TYPE   | Must be filled | Description| Example
-|------------------------|---------|----------------|------------|---|
-| accessKeyId           | String  | YES            | AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
-| accessKeySecret       | String  | YES            | AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建 | xxxx    |
+|         KEY            |  TYPE   | Must be filled | Description                                | Example
+|------------------------|---------|----------------|--------------------------------------------|---|
+| accessKeyId           | String  | YES            | AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建         | xxxx    |
+| accessKeySecret       | String  | YES            | AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建     | xxxx    |
 | namesrvAddr           | String  | YES            | 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx    |
-| topic                 | String  | YES            | 消息主题          | xxxx    |
-| instanceId            | String  | NO             | 阿里云MQ控制台的实例Id | xxxx    |
-| consumerGroup            | String  | YES            | 消息订阅者 | xxxx    |
+| topic                 | String  | YES            | 消息主题                                       | xxxx    |
+| instanceId            | String  | NO             | 阿里云MQ控制台的实例Id                              | xxxx    |
+| consumerGroup            | String  | YES            | 消息订阅者                                      | xxxx    |
+|connect-topicname       | String  | YES            | source需要处理数据消息topic                        | xxxx |
 
 ```  
 注:1. source/sink配置文件说明是以rocketmq-connect-rocketmq为demo,不同source/sink connector配置有差异,请以具体sourc/sink connector为准
@@ -68,4 +69,5 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-rocketmq-connector-na
 | namesrvAddr           | String  | YES            | 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看 | xxxx    |
 | topic                 | String  | YES            | 消息主题          | xxxx    |
 | instanceId            | String  | NO             | 阿里云MQ控制台的实例Id | xxxx    |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
index 790c6dd..0d13598 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnector.java
@@ -1,17 +1,25 @@
 package org.apache.rocketmq.connect.rocketmq;
 
 
+import com.aliyun.ons20190214.Client;
+import com.aliyun.ons20190214.models.OnsTopicListRequest;
+import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.sink.SinkConnector;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class RocketMQSinkConnector extends SinkConnector {
+    private static final Logger log = LoggerFactory.getLogger(RocketMQSinkConnector.class);
 
     private String accessKeyId;
 
@@ -59,6 +67,23 @@ public class RocketMQSinkConnector extends SinkConnector {
                 || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) {
             throw new RuntimeException("rocketmq required parameter is null !");
         }
+        try {
+            Config onsConfig = new Config()
+                    .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                    .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
+            onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
+            final Client client = new Client(onsConfig);
+            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
+                    .setTopic(config.getString(RocketMQConstant.TOPIC))
+                    .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
+            final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest);
+            if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter topic does not exist !");
+            }
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | validate | error => ", e);
+            throw new RuntimeException(e.getMessage());
+        }
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
index 6c8eb36..13f2623 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkTask.java
@@ -1,22 +1,18 @@
 package org.apache.rocketmq.connect.rocketmq;
 
-import com.aliyun.ons20190214.Client;
-import com.aliyun.ons20190214.models.OnsTopicListRequest;
-import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.ons.api.Message;
 import com.aliyun.openservices.ons.api.ONSFactory;
 import com.aliyun.openservices.ons.api.Producer;
 import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.aliyun.openservices.ons.api.SendResult;
 import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
-import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
-import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,14 +41,15 @@ public class RocketMQSinkTask extends SinkTask {
             sinkRecords.forEach(connectRecord -> {
                 Message message = new Message();
                 message.setBody(JSON.toJSONString(connectRecord.getData()).getBytes(StandardCharsets.UTF_8));
-                // TODO message.setKey();
-                // TODO message.setTag();
                 final KeyValue extensions = connectRecord.getExtensions();
                 if (extensions != null) {
+                    message.setKey(extensions.getString(RocketMQConstant.KEY));
+                    message.setTag(extensions.getString(RocketMQConstant.TAG));
                     extensions.keySet().forEach(key -> message.putUserProperties(key, extensions.getString(key)));
                 }
                 message.setTopic(topic);
-                producer.send(message);
+                final SendResult send = producer.send(message);
+                log.info("RocketMQSinkTask | put | send : {}", send);
             });
         } catch (Exception e) {
             log.error("RocketMQSinkTask | put | error => ", e);
@@ -72,30 +69,6 @@ public class RocketMQSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-            || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
-            || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
-            || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))) {
-            throw new RuntimeException("rocketmq required parameter is null !");
-        }
-        // 检查topic是否存在
-        try {
-            Config onsConfig = new Config()
-                    .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-                    .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
-            onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
-            final Client client = new Client(onsConfig);
-            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
-                    .setTopic(config.getString(RocketMQConstant.TOPIC))
-                    .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
-            final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest);
-            if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
-                throw new RuntimeException("rocketmq required parameter topic does not exist !");
-            }
-        } catch (Exception e) {
-            log.error("RocketMQSinkTask | validate | error => ", e);
-            throw new RuntimeException(e.getMessage());
-        }
     }
 
     @Override
@@ -118,7 +91,7 @@ public class RocketMQSinkTask extends SinkTask {
             properties.put(PropertyKeyConst.AccessKey, accessKeyId);
             properties.put(PropertyKeyConst.SecretKey, accessKeySecret);
             if (StringUtils.isNotBlank(instanceId)) {
-                properties.put(PropertyKeyConst.INSTANCE_ID,  instanceId);
+                properties.put(PropertyKeyConst.INSTANCE_ID, instanceId);
             }
             properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
             producer = ONSFactory.createProducer(properties);
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
index 778aa2b..3197837 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnector.java
@@ -1,16 +1,26 @@
 package org.apache.rocketmq.connect.rocketmq;
 
+import com.aliyun.ons20190214.Client;
+import com.aliyun.ons20190214.models.OnsGroupListRequest;
+import com.aliyun.ons20190214.models.OnsGroupListResponse;
+import com.aliyun.ons20190214.models.OnsTopicListRequest;
+import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
+import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class RocketMQSourceConnector extends SourceConnector {
+    private static final Logger log = LoggerFactory.getLogger(RocketMQSourceConnector.class);
 
     private String accessKeyId;
 
@@ -62,6 +72,30 @@ public class RocketMQSourceConnector extends SourceConnector {
                 || StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) {
             throw new RuntimeException("rocketmq required parameter is null !");
         }
+        try {
+            Config onsConfig = new Config()
+                    .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
+                    .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
+            onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
+            final Client client = new Client(onsConfig);
+            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
+                    .setTopic(config.getString(RocketMQConstant.TOPIC))
+                    .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
+            final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest);
+            if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter topic does not exist !");
+            }
+            OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest()
+                    .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID))
+                    .setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP));
+            final OnsGroupListResponse onsGroupListResponse = client.onsGroupList(onsGroupListRequest);
+            if (onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) {
+                throw new RuntimeException("rocketmq required parameter consumerGroup does not exist !");
+            }
+        } catch (Exception e) {
+            log.error("RocketMQSinkTask | validate | error => ", e);
+            throw new RuntimeException(e.getMessage());
+        }
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
index 9de3ace..cf8def1 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceTask.java
@@ -1,17 +1,11 @@
 package org.apache.rocketmq.connect.rocketmq;
 
-import com.aliyun.ons20190214.Client;
-import com.aliyun.ons20190214.models.OnsGroupListRequest;
-import com.aliyun.ons20190214.models.OnsGroupListResponse;
-import com.aliyun.ons20190214.models.OnsTopicListRequest;
-import com.aliyun.ons20190214.models.OnsTopicListResponse;
 import com.aliyun.openservices.ons.api.Action;
 import com.aliyun.openservices.ons.api.Consumer;
 import com.aliyun.openservices.ons.api.ONSFactory;
 import com.aliyun.openservices.ons.api.PropertyKeyConst;
 import com.aliyun.openservices.shade.com.google.common.collect.Maps;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
-import com.aliyun.teaopenapi.models.Config;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
@@ -19,7 +13,6 @@ import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
-import org.apache.rocketmq.connect.rocketmq.utils.OnsUtils;
 import org.assertj.core.util.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,9 +49,6 @@ public class RocketMQSourceTask extends SourceTask {
 
     @Override
     public List<ConnectRecord> poll() throws InterruptedException {
-        if (consumer == null) {
-            initConsumer();
-        }
         List<ConnectRecord> connectRecords = Lists.newArrayList();
         blockingQueue.drainTo(connectRecords, BATCH_POLL_SIZE);
         return connectRecords;
@@ -76,38 +66,6 @@ public class RocketMQSourceTask extends SourceTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-                || StringUtils.isBlank(config.getString(RocketMQConstant.ACCESS_KEY_SECRET))
-                || StringUtils.isBlank(config.getString(RocketMQConstant.NAMESRV_ADDR))
-                || StringUtils.isBlank(config.getString(RocketMQConstant.TOPIC))
-                || StringUtils.isBlank(config.getString(RocketMQConstant.CONSUMER_GROUP))) {
-            throw new RuntimeException("rocketmq required parameter is null !");
-        }
-        // 检查topic和consumer group是否存在
-        try {
-            Config onsConfig = new Config()
-                    .setAccessKeyId(config.getString(RocketMQConstant.ACCESS_KEY_ID))
-                    .setAccessKeySecret(config.getString(RocketMQConstant.ACCESS_KEY_SECRET));
-            onsConfig.endpoint = OnsUtils.parseEndpoint(config.getString(RocketMQConstant.NAMESRV_ADDR));
-            final Client client = new Client(onsConfig);
-            OnsTopicListRequest onsTopicListRequest = new OnsTopicListRequest()
-                    .setTopic(config.getString(RocketMQConstant.TOPIC))
-                    .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID));
-            final OnsTopicListResponse onsTopicListResponse = client.onsTopicList(onsTopicListRequest);
-            if (onsTopicListResponse.getBody().getData().getPublishInfoDo().isEmpty()) {
-                throw new RuntimeException("rocketmq required parameter topic does not exist !");
-            }
-            OnsGroupListRequest onsGroupListRequest = new OnsGroupListRequest()
-                    .setInstanceId(config.getString(RocketMQConstant.INSTANCE_ID))
-                    .setGroupId(config.getString(RocketMQConstant.CONSUMER_GROUP));
-            final OnsGroupListResponse onsGroupListResponse = client.onsGroupList(onsGroupListRequest);
-            if (onsGroupListResponse.getBody().getData().getSubscribeInfoDo().isEmpty()) {
-                throw new RuntimeException("rocketmq required parameter consumerGroup does not exist !");
-            }
-        } catch (Exception e) {
-            log.error("RocketMQSinkTask | validate | error => ", e);
-            throw new RuntimeException(e.getMessage());
-        }
     }
 
     @Override
@@ -122,17 +80,10 @@ public class RocketMQSourceTask extends SourceTask {
 
     @Override
     public void start(SourceTaskContext sourceTaskContext) {
-        try {
-            super.start(sourceTaskContext);
-            initConsumer();
-            consumer.start();
-        } catch (Exception e) {
-            log.error("RocketMQSourceTask | start | error => ", e);
-            throw e;
-        }
+        super.start(sourceTaskContext);
     }
 
-    private void initConsumer() {
+    private void initConsumer(String tag) {
         try {
             Properties properties = new Properties();
             properties.put(PropertyKeyConst.GROUP_ID, consumerGroup);
@@ -143,9 +94,9 @@ public class RocketMQSourceTask extends SourceTask {
                 properties.put(PropertyKeyConst.INSTANCE_ID, instanceId);
             }
             consumer = ONSFactory.createConsumer(properties);
-            // TODO TAG先忽略
-            consumer.subscribe(topic, "*", (message, consumeContext) -> {
+            consumer.subscribe(topic, tag, (message, consumeContext) -> {
                 try {
+                    log.info("RocketMQSourceTask | commit | initConsumer | message  : {}", message);
                     Map<String, String> sourceRecordPartition = Maps.newHashMap();
                     sourceRecordPartition.put("topic", message.getTopic());
                     sourceRecordPartition.put("brokerName", message.getBornHost());
@@ -174,6 +125,19 @@ public class RocketMQSourceTask extends SourceTask {
         }
     }
 
+    @Override
+    public void commit(List<ConnectRecord> connectRecords) throws InterruptedException {
+        try {
+            if (connectRecords.isEmpty()) return;
+            final ConnectRecord connectRecord = connectRecords.get(0);
+            initConsumer(connectRecord.getExtension(RocketMQConstant.TAG));
+            consumer.start();
+        } catch (Exception e) {
+            log.error("RocketMQSourceTask | commit | error => ", e);
+            throw e;
+        }
+    }
+
     @Override
     public void stop() {
         consumer.shutdown();
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
index cb1f352..529fd0f 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/main/java/org/apache/rocketmq/connect/rocketmq/common/RocketMQConstant.java
@@ -14,4 +14,8 @@ public class RocketMQConstant {
 
     public static final String CONSUMER_GROUP = "consumerGroup";
 
+    public static final String KEY = "KEYS";
+
+    public static final String TAG = "TAGS";
+
 }
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
index 897bb09..c05b19b 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSinkConnectorTest.java
@@ -75,7 +75,8 @@ public class RocketMQSinkConnectorTest {
         ConnectRecord connectRecord = new ConnectRecord(new RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), System.currentTimeMillis());
         connectRecord.setData("test message");
         connectRecords.add(connectRecord);
-        connectRecord.addExtension("key", "value");
+        connectRecord.addExtension(RocketMQConstant.KEY, "value");
+        connectRecord.addExtension(RocketMQConstant.TAG, "tag");
         rocketMQSinkTask.put(connectRecords);
     }
 
diff --git a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
index bbdac55..cd0f2e2 100644
--- a/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
+++ b/connectors/aliyun/rocketmq-connect-rocketmq/src/test/java/org/apache/rocketmq/connect/rocketmq/RocketMQSourceConnectorTest.java
@@ -2,12 +2,19 @@ package org.apache.rocketmq.connect.rocketmq;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.storage.OffsetStorageReader;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.connect.rocketmq.common.RocketMQConstant;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
 public class RocketMQSourceConnectorTest {
 
     @Test
@@ -44,6 +51,11 @@ public class RocketMQSourceConnectorTest {
                 return null;
             }
         });
+        List<ConnectRecord> connectRecords = new ArrayList<>(11);
+        ConnectRecord connectRecord = new ConnectRecord(new RecordPartition(new HashMap<>()), new RecordOffset(new HashMap<>()), System.currentTimeMillis());
+        connectRecord.addExtension(RocketMQConstant.TAG, "*");
+        connectRecords.add(connectRecord);
+        rocketMQSourceTask.commit(connectRecords);
         rocketMQSourceTask.poll();
         Thread.sleep(50000);
     }
diff --git a/connectors/rocketmq-connect-http/README.md b/connectors/rocketmq-connect-http/README.md
index 67b4bb8..8b6d5c7 100644
--- a/connectors/rocketmq-connect-http/README.md
+++ b/connectors/rocketmq-connect-http/README.md
@@ -15,13 +15,13 @@ mvn clean install -Dmaven.test.skip=true
 
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-sink-connector-name}
-?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector",“url”:"${url}"}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "${connect-topicname}","url":"${url}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/httpConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","url":"192.168.1.2"}
+"connector-class":"org.apache.rocketmq.connect.http.sink.HttpSinkConnector","connect-topicname" : "http-topic","url":"192.168.1.2"}
 ```
 
 >**注:** `rocketmq-http-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -38,4 +38,5 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-http-connector-name}/
 | KEY |  TYPE   | Must be filled | Description | Example          
 |-----|---------|----------------|-------------|------------------|
 | url | String  | YES            | sink端 域名地址  | http://127.0.0.1 |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
index 679653e..603bafa 100644
--- a/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
+++ b/connectors/rocketmq-connect-http/src/main/java/org/apache/rocketmq/connect/http/sink/HttpSinkTask.java
@@ -1,13 +1,12 @@
 package org.apache.rocketmq.connect.http.sink;
 
-import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils;
-import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.http.sink.common.OkHttpUtils;
+import org.apache.rocketmq.connect.http.sink.constant.HttpConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,9 +42,6 @@ public class HttpSinkTask extends SinkTask {
 
     @Override
     public void validate(KeyValue config) {
-        if (StringUtils.isBlank(config.getString(HttpConstant.URL_CONSTANT))) {
-            throw new RuntimeException("http required parameter is null !");
-        }
     }
 
     @Override


[rocketmq-connect] 02/02: add project.build.sourceEncoding

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit a110c5d8531c87e6a3ef7c6b896bf2d3ffd45507
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Fri Apr 22 15:14:37 2022 +0800

    add project.build.sourceEncoding
---
 connectors/aliyun/rocketmq-connect-dingtalk/pom.xml | 1 +
 connectors/aliyun/rocketmq-connect-fc/pom.xml       | 1 +
 2 files changed, 2 insertions(+)

diff --git a/connectors/aliyun/rocketmq-connect-dingtalk/pom.xml b/connectors/aliyun/rocketmq-connect-dingtalk/pom.xml
index 48327cd..ff831d3 100644
--- a/connectors/aliyun/rocketmq-connect-dingtalk/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-dingtalk/pom.xml
@@ -22,6 +22,7 @@
         <okhttp.version>3.9.1</okhttp.version>
         <fastjson.version>1.2.62</fastjson.version>
         <commons-lang3.version>3.12.0</commons-lang3.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
     <build>
diff --git a/connectors/aliyun/rocketmq-connect-fc/pom.xml b/connectors/aliyun/rocketmq-connect-fc/pom.xml
index bfdee64..9908092 100644
--- a/connectors/aliyun/rocketmq-connect-fc/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-fc/pom.xml
@@ -21,6 +21,7 @@
         <fastjson.version>1.2.62</fastjson.version>
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <aliyun-java-sdk-fc.version>1.8.25</aliyun-java-sdk-fc.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
     <build>