You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:19 UTC

[rocketmq-connect] 02/10: update eventbridge sts init

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 26ff79e93cf06dce11c913b90fb0ab0d96a24c8f
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Wed Apr 27 11:16:38 2022 +0800

    update eventbridge sts init
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  | 18 ++++-----
 .../aliyun/rocketmq-connect-eventbridge/pom.xml    | 18 +++++++++
 .../eventbridge/sink/EventBridgeSinkConnector.java | 36 +++++++++---------
 .../eventbridge/sink/EventBridgeSinkTask.java      | 43 ++++++++++++++--------
 .../sink/constant/EventBridgeConstant.java         |  9 +++--
 .../eventbridge/sink/EventBridgeSinkTest.java      | 11 ++++--
 6 files changed, 85 insertions(+), 50 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index d4eca92..f6d9489 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,15 +16,14 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}","eventId":"${eventId}","eventSource":"${eventSource}","eventType":"${eventType}", 
-"eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"regionId”:"${regionId}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",accountEndpoint”:"xxxx","eventId":"xxxx",
-"eventSource":"xxxx","eventType":"", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","regionId”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
+roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
 ```
 
 >**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -40,12 +39,11 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 
 |         KEY            |  TYPE   | Must be filled | Description                      | Example
 |------------------------|---------|----------------|----------------------------------|--|
-|accessKeyId             | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                    | xxxx |
-|accessKeySecret         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                     | xxx |
-|accountEndpoint         | String  | YES            | 阿里云EventBridge官方接入点                     | xxxx |
-|eventId                 | String  | YES            | 事件ID | xxxx |
-|eventSource             | String  | YES            | 事件源 | xxxx |
-|eventType               | String | YES             | 事件类型                           | null |
+|regionId                | String  | YES            | 地域                               | cn-hangzhou|
+|accessKeyId             | String  | YES            | 阿里云授信账号的AK                    | xxxx |
+|accessKeySecret         | String  | YES            | 阿里云授信账号的SK                     | xxx |
+|roleArn                 | String  | YES            | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
+|roleSessionName         | String  | YES            | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
 |eventTime               | String | YES             | 事件产生的时间                          | xxxx |
 |eventSubject            | String | NO              | 事件主题                          | xxxx |
 |aliyuneventbusname      | String | YES             | 接收事件的事件总线名称                          | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
index 469d1fe..4730384 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -21,6 +21,9 @@
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <eventbridge-client.version>1.3.5</eventbridge-client.version>
+        <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
+        <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
+        <gson.version>2.9.0</gson.version>
     </properties>
 
     <build>
@@ -189,6 +192,21 @@
             <artifactId>eventbridge-client</artifactId>
             <version>${eventbridge-client.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-core</artifactId>
+            <version>${aliyun-java-sdk-core.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-sts</artifactId>
+            <version>${aliyun-java-sdk-sts.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index f986a0a..967c50b 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -12,24 +12,24 @@ import java.util.List;
 
 public class EventBridgeSinkConnector extends SinkConnector {
 
+    private String regionId;
+
     private String accessKeyId;
 
     private String accessKeySecret;
 
-    private String accountEndpoint;
-
-    private String eventId;
+    private String roleArn;
 
-    private String eventSource;
+    private String roleSessionName;
 
     private String eventTime;
 
-    private String eventType;
-
     private String eventSubject;
 
     private String aliyuneventbusname;
 
+    private String accountEndpoint;
+
     @Override
     public void pause() {
 
@@ -44,15 +44,15 @@ public class EventBridgeSinkConnector extends SinkConnector {
     public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> keyValueList = new ArrayList<>(11);
         KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, regionId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
-        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
-        keyValue.put(EventBridgeConstant.EVENT_ID, eventId);
-        keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
+        keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
+        keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, roleSessionName);
         keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
-        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
+        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -67,11 +67,11 @@ public class EventBridgeSinkConnector extends SinkConnector {
         if (StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_ID))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_ID))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_ARN))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.REGION_ID_CONSTANT))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }
@@ -80,13 +80,13 @@ public class EventBridgeSinkConnector extends SinkConnector {
     public void init(KeyValue config) {
         accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
-        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
-        eventId = config.getString(EventBridgeConstant.EVENT_ID);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
+        roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 3af4564..28eba62 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -5,6 +5,11 @@ import com.aliyun.eventbridge.models.CloudEvent;
 import com.aliyun.eventbridge.models.Config;
 import com.aliyun.eventbridge.models.PutEventsResponse;
 import com.aliyun.eventbridge.util.EventBuilder;
+import com.aliyuncs.DefaultAcsClient;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.profile.DefaultProfile;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
@@ -22,24 +27,24 @@ import java.util.List;
 public class EventBridgeSinkTask extends SinkTask {
     private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
 
+    private String regionId;
+
     private String accessKeyId;
 
     private String accessKeySecret;
 
-    private String accountEndpoint;
-
-    private String eventId;
+    private String roleArn;
 
-    private String eventSource;
+    private String roleSessionName;
 
     private String eventTime;
 
-    private String eventType;
-
     private String eventSubject;
 
     private String aliyuneventbusname;
 
+    private String accountEndpoint;
+
     private EventBridgeClient eventBridgeClient;
 
     @Override
@@ -47,9 +52,9 @@ public class EventBridgeSinkTask extends SinkTask {
         List<CloudEvent> cloudEventList = new ArrayList<>();
         try {
             sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
-                    .withId(eventId)
-                    .withSource(URI.create(eventSource))
-                    .withType(eventType)
+                    .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
+                    .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
+                    .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
                     .withSubject(eventSubject)
                     .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
                     .withJsonStringData(connectRecord.getData().toString())
@@ -82,22 +87,30 @@ public class EventBridgeSinkTask extends SinkTask {
     public void init(KeyValue config) {
         accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
-        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
-        eventId = config.getString(EventBridgeConstant.EVENT_ID);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
+        roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
     }
 
     @Override
     public void start(SinkTaskContext sinkTaskContext) {
         super.start(sinkTaskContext);
         try {
+            DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret);
+            IAcsClient client = new DefaultAcsClient(profile);
+            AssumeRoleRequest request = new AssumeRoleRequest();
+            request.setRegionId(regionId);
+            request.setRoleArn(roleArn);
+            request.setRoleSessionName(roleSessionName);
+            AssumeRoleResponse response = client.getAcsResponse(request);
             Config authConfig = new Config();
-            authConfig.accessKeyId = accessKeyId;
-            authConfig.accessKeySecret = accessKeySecret;
+            authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
+            authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
+            authConfig.securityToken = response.getCredentials().getSecurityToken();
             authConfig.endpoint = accountEndpoint;
             eventBridgeClient = new EventBridgeClient(authConfig);
         } catch (Exception e) {
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
index 51a7508..9ae33d0 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -5,10 +5,13 @@ public class EventBridgeConstant {
     public static final String ACCESS_KEY_ID = "accessKeyId";
     public static final String ACCESS_KEY_SECRET = "accessKeySecret";
     public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
-    public static final String EVENT_ID = "eventId";
-    public static final String EVENT_SOURCE = "eventSource";
+    public static final String ROLE_ARN = "roleArn";
+    public static final String ROLE_SESSION_NAME = "roleSessionName";
+    public static final String EVENT_ID = "id";
+    public static final String EVENT_SOURCE = "source";
     public static final String EVENT_TIME = "eventTime";
-    public static final String EVENT_TYPE = "eventType";
+    public static final String EVENT_TYPE = "type";
     public static final String EVENT_SUBJECT = "eventSubject";
     public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
+    public static final String REGION_ID_CONSTANT = "regionId";
 }
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 030794c..5add642 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -29,13 +29,13 @@ public class EventBridgeSinkTest {
     public void testPut() {
         EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
         KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
-        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
-        keyValue.put(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
-        keyValue.put(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");
+        keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, "xxxx");
         keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
-        keyValue.put(EventBridgeConstant.EVENT_TYPE, "xxxx");
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
         eventBridgeSinkTask.init(keyValue);
@@ -44,6 +44,9 @@ public class EventBridgeSinkTest {
         connectRecord.setData("{\n" +
                 "\t\"test\" :  \"test\"\n" +
                 "}");
+        connectRecord.addExtension(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
+        connectRecord.addExtension(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+        connectRecord.addExtension(EventBridgeConstant.EVENT_TYPE, "xxxx");
         connectRecordList.add(connectRecord);
         eventBridgeSinkTask.start(new SinkTaskContext() {
             @Override