You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:27 UTC

[rocketmq-connect] 10/10: update README.md

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit ffa4cdb416016102c9367ff540381d3cd265d352
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Thu Jun 2 15:21:58 2022 +0800

    update README.md
---
 .../aliyun/rocketmq-connect-eventbridge/README.md       |  7 ++-----
 .../eventbridge/sink/EventBridgeSinkConnector.java      | 17 +----------------
 .../connect/eventbridge/sink/EventBridgeSinkTask.java   | 16 ++++------------
 .../connect/eventbridge/sink/utils/DateUtils.java       |  6 ------
 .../connect/eventbridge/sink/EventBridgeSinkTest.java   |  2 +-
 5 files changed, 8 insertions(+), 40 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index 4d3b966..c9a5251 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,14 +16,14 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","eventSource":"${eventSource}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
 "connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
-roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","eventSource":"","aliyuneventbusname":"xxxx"}
+roleArn”:"xxxx", "roleSessionName":"xxxx", "aliyuneventbusname":"xxxx","eventSubject":""}
 ```
 
 >**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -45,10 +45,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 |accessKeySecret         | String  | YES             | 阿里云授信账号的SK                     | xxx |
 |roleArn                 | String  | NO             | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
 |roleSessionName         | String  | NO             | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
-|eventTime               | String  | NO              | 事件产生的时间                          | xxxx |
 |eventSubject            | String  | YES             | 事件主题                          | xxxx |
-|eventType               | String  | YES             | 事件类型                          | xxxx |
-|eventSource             | String  | YES             | 事件源                          | xxxx |
 |aliyuneventbusname      | String  | YES             | 接收事件的事件总线名称                          | xxxx |
 |connect-topicname       | String  | YES             | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 0f4af2f..57cdbb7 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -6,7 +6,6 @@ import io.openmessaging.connector.api.component.task.sink.SinkConnector;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
-import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -21,14 +20,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
 
     private String roleSessionName;
 
-    private String eventTime;
-
-    private String eventSource;
-
     private String eventSubject;
 
-    private String eventType;
-
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -53,13 +46,10 @@ public class EventBridgeSinkConnector extends SinkConnector {
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
         keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
         keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, roleSessionName);
-        keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
         keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
-        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
-        keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -75,9 +65,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }
@@ -88,13 +76,10 @@ public class EventBridgeSinkConnector extends SinkConnector {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 95880b2..5c58473 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkTask extends SinkTask {
@@ -39,14 +40,8 @@ public class EventBridgeSinkTask extends SinkTask {
 
     private String roleSessionName;
 
-    private String eventTime;
-
     private String eventSubject;
 
-    private String eventType;
-
-    private String eventSource;
-
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -59,10 +54,10 @@ public class EventBridgeSinkTask extends SinkTask {
         try {
             sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
                     .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
-                    .withSource(CheckUtils.checkNull(eventSource) ? URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)) : URI.create(eventSource))
-                    .withType(CheckUtils.checkNull(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
+                    .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
+                    .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
                     .withSubject(eventSubject)
-                    .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
+                    .withTime(CheckUtils.checkNull(connectRecord.getExtension(EventBridgeConstant.EVENT_TIME)) ? new Date() : DateUtils.getDate(connectRecord.getExtension(EventBridgeConstant.EVENT_TIME), DateUtils.DEFAULT_DATE_FORMAT))
                     .withJsonStringData(connectRecord.getData().toString())
                     .withAliyunEventBus(aliyuneventbusname)
                     .build()));
@@ -95,13 +90,10 @@ public class EventBridgeSinkTask extends SinkTask {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
index 380f14e..b90911b 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
@@ -3,7 +3,6 @@ package org.apache.rocketmq.connect.eventbridge.sink.utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
@@ -21,9 +20,4 @@ public class DateUtils {
         return null;
     }
 
-    public static String getDate() {
-        DateFormat dateFormat = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
-        return dateFormat.format(new Date());
-    }
-
 }
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 670fd87..3561b4a 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -35,7 +35,6 @@ public class EventBridgeSinkTest {
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
         keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");
         keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, "xxxx");
-        keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
         eventBridgeSinkTask.init(keyValue);
@@ -47,6 +46,7 @@ public class EventBridgeSinkTest {
         connectRecord.addExtension(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
         connectRecord.addExtension(EventBridgeConstant.EVENT_SOURCE, "xxxx");
         connectRecord.addExtension(EventBridgeConstant.EVENT_TYPE, "xxxx");
+        connectRecord.addExtension(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
         connectRecordList.add(connectRecord);
         eventBridgeSinkTask.start(new SinkTaskContext() {
             @Override