You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:21 UTC
[rocketmq-connect] 04/10: add eventType
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit d516617faf27d2b671ada1c557ae89a73d0cc808
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Tue May 31 10:39:54 2022 +0800
add eventType
---
.../aliyun/rocketmq-connect-eventbridge/README.md | 23 +++++++------
.../eventbridge/sink/EventBridgeSinkConnector.java | 7 +++-
.../eventbridge/sink/EventBridgeSinkTask.java | 40 ++++++++++++++--------
3 files changed, 43 insertions(+), 27 deletions(-)
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index f22b5fe..8e5ab33 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,14 +16,14 @@ mvn clean install -Dmaven.test.skip=true
```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","aliyuneventbusname":"${aliyuneventbusname}"}
```
例子
```
http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
-roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","aliyuneventbusname":"xxxx"}
```
>**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -39,13 +39,14 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
| KEY | TYPE | Must be filled | Description | Example
|------------------------|---------|----------------|----------------------------------|--|
-|stsEndpoint | String | YES | STS endpoint | xxxx|
-|accessKeyId | String | YES | 阿里云授信账号的AK | xxxx |
-|accessKeySecret | String | YES | 阿里云授信账号的SK | xxx |
-|roleArn | String | YES | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色 | xxxx |
-|roleSessionName | String | YES | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份 | 用户名 |
-|eventTime | String | YES | 事件产生的时间 | xxxx |
-|eventSubject | String | NO | 事件主题 | xxxx |
-|aliyuneventbusname | String | YES | 接收事件的事件总线名称 | xxxx |
-|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |
+|stsEndpoint | String | YES | STS endpoint | xxxx|
+|accessKeyId | String | YES | 阿里云授信账号的AK | xxxx |
+|accessKeySecret | String | YES | 阿里云授信账号的SK | xxx |
+|roleArn | String | YES | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色 | xxxx |
+|roleSessionName | String | YES | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份 | 用户名 |
+|eventTime | String | NO | 事件产生的时间 | xxxx |
+|eventSubject | String | YES | 事件主题 | xxxx |
+|eventType | String | YES | 事件类型 | xxxx |
+|aliyuneventbusname | String | YES | 接收事件的事件总线名称 | xxxx |
+|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 1a7a0b1..cc0d0a5 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -8,6 +8,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
public class EventBridgeSinkConnector extends SinkConnector {
@@ -24,6 +25,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
private String eventSubject;
+ private String eventType;
+
private String aliyuneventbusname;
private String accountEndpoint;
@@ -53,6 +56,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
+ keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
keyValueList.add(keyValue);
return keyValueList;
}
@@ -82,11 +86,12 @@ public class EventBridgeSinkConnector extends SinkConnector {
accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
- eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+ eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
+ eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
}
@Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index aaeac3a..c1ce188 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -16,6 +16,7 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
import org.slf4j.Logger;
@@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
public class EventBridgeSinkTask extends SinkTask {
@@ -42,6 +44,8 @@ public class EventBridgeSinkTask extends SinkTask {
private String eventSubject;
+ private String eventType;
+
private String aliyuneventbusname;
private String accountEndpoint;
@@ -55,7 +59,7 @@ public class EventBridgeSinkTask extends SinkTask {
sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
.withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
.withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
- .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
+ .withType(StringUtils.isBlank(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
.withSubject(eventSubject)
.withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
.withJsonStringData(connectRecord.getData().toString())
@@ -90,31 +94,37 @@ public class EventBridgeSinkTask extends SinkTask {
accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
- eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+ eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
+ eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
}
@Override
public void start(SinkTaskContext sinkTaskContext) {
super.start(sinkTaskContext);
try {
- DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
- DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
- IAcsClient client = new DefaultAcsClient(profile);
- AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
- request.setRoleArn(roleArn);
- request.setRoleSessionName(roleSessionName);
- request.setAssumeRoleFor(roleSessionName);
- request.setAcceptFormat(FormatType.JSON);
- request.setDurationSeconds(3600L);
- final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
Config authConfig = new Config();
- authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
- authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
- authConfig.securityToken = response.getCredentials().getSecurityToken();
+ if (StringUtils.isNotBlank(roleArn) && StringUtils.isNotBlank(roleSessionName)) {
+ DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
+ DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
+ IAcsClient client = new DefaultAcsClient(profile);
+ AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
+ request.setRoleArn(roleArn);
+ request.setRoleSessionName(roleSessionName);
+ request.setAssumeRoleFor(roleSessionName);
+ request.setAcceptFormat(FormatType.JSON);
+ request.setDurationSeconds(3600L);
+ final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
+ authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
+ authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
+ authConfig.securityToken = response.getCredentials().getSecurityToken();
+ } else {
+ authConfig.accessKeyId = accessKeyId;
+ authConfig.accessKeySecret = accessKeySecret;
+ }
authConfig.endpoint = accountEndpoint;
eventBridgeClient = new EventBridgeClient(authConfig);
} catch (Exception e) {