You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:21 UTC

[rocketmq-connect] 04/10: add eventType

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d516617faf27d2b671ada1c557ae89a73d0cc808
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Tue May 31 10:39:54 2022 +0800

    add eventType
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  | 23 +++++++------
 .../eventbridge/sink/EventBridgeSinkConnector.java |  7 +++-
 .../eventbridge/sink/EventBridgeSinkTask.java      | 40 ++++++++++++++--------
 3 files changed, 43 insertions(+), 27 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index f22b5fe..8e5ab33 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,14 +16,14 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
 "connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
-roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","aliyuneventbusname":"xxxx"}
 ```
 
 >**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -39,13 +39,14 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 
 |         KEY            |  TYPE   | Must be filled | Description                      | Example
 |------------------------|---------|----------------|----------------------------------|--|
-|stsEndpoint             | String  | YES            | STS endpoint                       | xxxx|
-|accessKeyId             | String  | YES            | 阿里云授信账号的AK                    | xxxx |
-|accessKeySecret         | String  | YES            | 阿里云授信账号的SK                     | xxx |
-|roleArn                 | String  | YES            | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
-|roleSessionName         | String  | YES            | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
-|eventTime               | String | YES             | 事件产生的时间                          | xxxx |
-|eventSubject            | String | NO              | 事件主题                          | xxxx |
-|aliyuneventbusname      | String | YES             | 接收事件的事件总线名称                          | xxxx |
-|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
+|stsEndpoint             | String  | YES             | STS endpoint                       | xxxx|
+|accessKeyId             | String  | YES             | 阿里云授信账号的AK                    | xxxx |
+|accessKeySecret         | String  | YES             | 阿里云授信账号的SK                     | xxx |
+|roleArn                 | String  | YES             | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
+|roleSessionName         | String  | YES             | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
+|eventTime               | String  | NO              | 事件产生的时间                          | xxxx |
+|eventSubject            | String  | YES             | 事件主题                          | xxxx |
+|eventType               | String  | YES             | 事件类型                          | xxxx |
+|aliyuneventbusname      | String  | YES             | 接收事件的事件总线名称                          | xxxx |
+|connect-topicname       | String  | YES             | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 1a7a0b1..cc0d0a5 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -8,6 +8,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkConnector extends SinkConnector {
@@ -24,6 +25,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
 
     private String eventSubject;
 
+    private String eventType;
+
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -53,6 +56,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
         keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
+        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -82,11 +86,12 @@ public class EventBridgeSinkConnector extends SinkConnector {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index aaeac3a..c1ce188 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -16,6 +16,7 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
 import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
 import org.slf4j.Logger;
@@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkTask extends SinkTask {
@@ -42,6 +44,8 @@ public class EventBridgeSinkTask extends SinkTask {
 
     private String eventSubject;
 
+    private String eventType;
+
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -55,7 +59,7 @@ public class EventBridgeSinkTask extends SinkTask {
             sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
                     .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
                     .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
-                    .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
+                    .withType(StringUtils.isBlank(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
                     .withSubject(eventSubject)
                     .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
                     .withJsonStringData(connectRecord.getData().toString())
@@ -90,31 +94,37 @@ public class EventBridgeSinkTask extends SinkTask {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
     }
 
     @Override
     public void start(SinkTaskContext sinkTaskContext) {
         super.start(sinkTaskContext);
         try {
-            DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
-            DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
-            IAcsClient client = new DefaultAcsClient(profile);
-            AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
-            request.setRoleArn(roleArn);
-            request.setRoleSessionName(roleSessionName);
-            request.setAssumeRoleFor(roleSessionName);
-            request.setAcceptFormat(FormatType.JSON);
-            request.setDurationSeconds(3600L);
-            final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
             Config authConfig = new Config();
-            authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
-            authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
-            authConfig.securityToken = response.getCredentials().getSecurityToken();
+            if (StringUtils.isNotBlank(roleArn) && StringUtils.isNotBlank(roleSessionName)) {
+                DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
+                DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
+                IAcsClient client = new DefaultAcsClient(profile);
+                AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
+                request.setRoleArn(roleArn);
+                request.setRoleSessionName(roleSessionName);
+                request.setAssumeRoleFor(roleSessionName);
+                request.setAcceptFormat(FormatType.JSON);
+                request.setDurationSeconds(3600L);
+                final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
+                authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
+                authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
+                authConfig.securityToken = response.getCredentials().getSecurityToken();
+            } else {
+                authConfig.accessKeyId = accessKeyId;
+                authConfig.accessKeySecret = accessKeySecret;
+            }
             authConfig.endpoint = accountEndpoint;
             eventBridgeClient = new EventBridgeClient(authConfig);
         } catch (Exception e) {