You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:20 UTC

[rocketmq-connect] 03/10: update eventbridge sts init

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 02292146999b1f47d49553c91a3cb0b120a14ea8
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Mon May 9 14:23:28 2022 +0800

    update eventbridge sts init
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  |  6 +++---
 .../aliyun/rocketmq-connect-eventbridge/pom.xml    | 10 +++++++--
 .../eventbridge/sink/EventBridgeSinkConnector.java | 10 ++++-----
 .../eventbridge/sink/EventBridgeSinkTask.java      | 24 +++++++++++++---------
 .../sink/constant/EventBridgeConstant.java         |  2 +-
 .../eventbridge/sink/EventBridgeSinkTest.java      |  2 +-
 6 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index f6d9489..f22b5fe 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,13 +16,13 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"regionId”:"${regionId}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","regionId”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
 roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
 ```
 
@@ -39,7 +39,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 
 |         KEY            |  TYPE   | Must be filled | Description                      | Example
 |------------------------|---------|----------------|----------------------------------|--|
-|regionId                | String  | YES            | 地域                               | cn-hangzhou|
+|stsEndpoint             | String  | YES            | STS endpoint                       | xxxx|
 |accessKeyId             | String  | YES            | 阿里云授信账号的AK                    | xxxx |
 |accessKeySecret         | String  | YES            | 阿里云授信账号的SK                     | xxx |
 |roleArn                 | String  | YES            | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
index 4730384..ba1875c 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -21,8 +21,9 @@
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <eventbridge-client.version>1.3.5</eventbridge-client.version>
-        <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
-        <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
+        <aliyun-java-sdk-sts.version>2.2.0</aliyun-java-sdk-sts.version>
+        <aliyun-java-sdk-core.version>4.5.24</aliyun-java-sdk-core.version>
+        <aliyun-java-sdk-sts-internal.version>3.0.0</aliyun-java-sdk-sts-internal.version>
         <gson.version>2.9.0</gson.version>
     </properties>
 
@@ -207,6 +208,11 @@
             <artifactId>aliyun-java-sdk-sts</artifactId>
             <version>${aliyun-java-sdk-sts.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-sts-internal</artifactId>
+            <version>${aliyun-java-sdk-sts-internal.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 967c50b..1a7a0b1 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -12,8 +12,6 @@ import java.util.List;
 
 public class EventBridgeSinkConnector extends SinkConnector {
 
-    private String regionId;
-
     private String accessKeyId;
 
     private String accessKeySecret;
@@ -30,6 +28,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
 
     private String accountEndpoint;
 
+    private String stsEndpoint;
+
     @Override
     public void pause() {
 
@@ -44,7 +44,6 @@ public class EventBridgeSinkConnector extends SinkConnector {
     public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> keyValueList = new ArrayList<>(11);
         KeyValue keyValue = new DefaultKeyValue();
-        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, regionId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
         keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
@@ -53,6 +52,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
+        keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -71,7 +71,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.REGION_ID_CONSTANT))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.STS_ENDPOINT))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }
@@ -85,8 +85,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
-        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 28eba62..aaeac3a 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -7,9 +7,10 @@ import com.aliyun.eventbridge.models.PutEventsResponse;
 import com.aliyun.eventbridge.util.EventBuilder;
 import com.aliyuncs.DefaultAcsClient;
 import com.aliyuncs.IAcsClient;
+import com.aliyuncs.http.FormatType;
 import com.aliyuncs.profile.DefaultProfile;
-import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
-import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleWithServiceIdentityRequest;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleWithServiceIdentityResponse;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
@@ -27,12 +28,12 @@ import java.util.List;
 public class EventBridgeSinkTask extends SinkTask {
     private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
 
-    private String regionId;
-
     private String accessKeyId;
 
     private String accessKeySecret;
 
+    private String stsEndpoint;
+
     private String roleArn;
 
     private String roleSessionName;
@@ -61,7 +62,7 @@ public class EventBridgeSinkTask extends SinkTask {
                     .withAliyunEventBus(aliyuneventbusname)
                     .build()));
             PutEventsResponse putEventsResponse = eventBridgeClient.putEvents(cloudEventList);
-            log.info("EventBridgeSinkTask | put | putEventsResponse | entryList : {} | requestId : {}", putEventsResponse.getEntryList(), putEventsResponse.getRequestId());
+            log.info("EventBridgeSinkTask | put | putEventsResponse | eventId : {} | traceId : {} | requestId : {}", putEventsResponse.getEntryList().get(0).getEventId(), putEventsResponse.getEntryList().get(0).getTraceId(), putEventsResponse.getRequestId());
         } catch (Exception e) {
             log.error("EventBridgeSinkTask | put | error => ", e);
             throw new RuntimeException(e);
@@ -92,21 +93,24 @@ public class EventBridgeSinkTask extends SinkTask {
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
-        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
     }
 
     @Override
     public void start(SinkTaskContext sinkTaskContext) {
         super.start(sinkTaskContext);
         try {
-            DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret);
+            DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
+            DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
             IAcsClient client = new DefaultAcsClient(profile);
-            AssumeRoleRequest request = new AssumeRoleRequest();
-            request.setRegionId(regionId);
+            AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
             request.setRoleArn(roleArn);
             request.setRoleSessionName(roleSessionName);
-            AssumeRoleResponse response = client.getAcsResponse(request);
+            request.setAssumeRoleFor(roleSessionName);
+            request.setAcceptFormat(FormatType.JSON);
+            request.setDurationSeconds(3600L);
+            final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
             Config authConfig = new Config();
             authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
             authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
index 9ae33d0..8c96842 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -13,5 +13,5 @@ public class EventBridgeConstant {
     public static final String EVENT_TYPE = "type";
     public static final String EVENT_SUBJECT = "eventSubject";
     public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
-    public static final String REGION_ID_CONSTANT = "regionId";
+    public static final String STS_ENDPOINT = "stsEndpoint";
 }
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 5add642..670fd87 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -29,8 +29,8 @@ public class EventBridgeSinkTest {
     public void testPut() {
         EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
         KeyValue keyValue = new DefaultKeyValue();
-        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(EventBridgeConstant.STS_ENDPOINT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
         keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");