You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:27 UTC
[rocketmq-connect] 10/10: update README.md
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit ffa4cdb416016102c9367ff540381d3cd265d352
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Thu Jun 2 15:21:58 2022 +0800
update README.md
---
.../aliyun/rocketmq-connect-eventbridge/README.md | 7 ++-----
.../eventbridge/sink/EventBridgeSinkConnector.java | 17 +----------------
.../connect/eventbridge/sink/EventBridgeSinkTask.java | 16 ++++------------
.../connect/eventbridge/sink/utils/DateUtils.java | 6 ------
.../connect/eventbridge/sink/EventBridgeSinkTest.java | 2 +-
5 files changed, 8 insertions(+), 40 deletions(-)
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index 4d3b966..c9a5251 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,14 +16,14 @@ mvn clean install -Dmaven.test.skip=true
```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","eventSource":"${eventSource}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
```
例子
```
http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
-roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","eventSource":"","aliyuneventbusname":"xxxx"}
+roleArn”:"xxxx", "roleSessionName":"xxxx", "aliyuneventbusname":"xxxx","eventSubject":""}
```
>**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -45,10 +45,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
|accessKeySecret | String | YES | 阿里云授信账号的SK | xxx |
|roleArn | String | NO | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色 | xxxx |
|roleSessionName | String | NO | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份 | 用户名 |
-|eventTime | String | NO | 事件产生的时间 | xxxx |
|eventSubject | String | YES | 事件主题 | xxxx |
-|eventType | String | YES | 事件类型 | xxxx |
-|eventSource | String | YES | 事件源 | xxxx |
|aliyuneventbusname | String | YES | 接收事件的事件总线名称 | xxxx |
|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 0f4af2f..57cdbb7 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -6,7 +6,6 @@ import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
-import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
import java.util.ArrayList;
import java.util.List;
@@ -21,14 +20,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
private String roleSessionName;
- private String eventTime;
-
- private String eventSource;
-
private String eventSubject;
- private String eventType;
-
private String aliyuneventbusname;
private String accountEndpoint;
@@ -53,13 +46,10 @@ public class EventBridgeSinkConnector extends SinkConnector {
keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, roleSessionName);
- keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
- keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
- keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
keyValueList.add(keyValue);
return keyValueList;
}
@@ -75,9 +65,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
|| StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
|| StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
|| StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))) {
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))) {
throw new RuntimeException("EventBridge required parameter is null !");
}
}
@@ -88,13 +76,10 @@ public class EventBridgeSinkConnector extends SinkConnector {
accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
- eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
- eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
- eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
}
@Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 95880b2..5c58473 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
public class EventBridgeSinkTask extends SinkTask {
@@ -39,14 +40,8 @@ public class EventBridgeSinkTask extends SinkTask {
private String roleSessionName;
- private String eventTime;
-
private String eventSubject;
- private String eventType;
-
- private String eventSource;
-
private String aliyuneventbusname;
private String accountEndpoint;
@@ -59,10 +54,10 @@ public class EventBridgeSinkTask extends SinkTask {
try {
sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
.withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
- .withSource(CheckUtils.checkNull(eventSource) ? URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)) : URI.create(eventSource))
- .withType(CheckUtils.checkNull(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
+ .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
+ .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
.withSubject(eventSubject)
- .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
+ .withTime(CheckUtils.checkNull(connectRecord.getExtension(EventBridgeConstant.EVENT_TIME)) ? new Date() : DateUtils.getDate(connectRecord.getExtension(EventBridgeConstant.EVENT_TIME), DateUtils.DEFAULT_DATE_FORMAT))
.withJsonStringData(connectRecord.getData().toString())
.withAliyunEventBus(aliyuneventbusname)
.build()));
@@ -95,13 +90,10 @@ public class EventBridgeSinkTask extends SinkTask {
accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
- eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
- eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
- eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
}
@Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
index 380f14e..b90911b 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
@@ -3,7 +3,6 @@ package org.apache.rocketmq.connect.eventbridge.sink.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -21,9 +20,4 @@ public class DateUtils {
return null;
}
- public static String getDate() {
- DateFormat dateFormat = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
- return dateFormat.format(new Date());
- }
-
}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 670fd87..3561b4a 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -35,7 +35,6 @@ public class EventBridgeSinkTest {
keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");
keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, "xxxx");
- keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
eventBridgeSinkTask.init(keyValue);
@@ -47,6 +46,7 @@ public class EventBridgeSinkTest {
connectRecord.addExtension(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
connectRecord.addExtension(EventBridgeConstant.EVENT_SOURCE, "xxxx");
connectRecord.addExtension(EventBridgeConstant.EVENT_TYPE, "xxxx");
+ connectRecord.addExtension(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
connectRecordList.add(connectRecord);
eventBridgeSinkTask.start(new SinkTaskContext() {
@Override