You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:20 UTC
[rocketmq-connect] 03/10: update eventbridge sts init
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 02292146999b1f47d49553c91a3cb0b120a14ea8
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Mon May 9 14:23:28 2022 +0800
update eventbridge sts init
---
.../aliyun/rocketmq-connect-eventbridge/README.md | 6 +++---
.../aliyun/rocketmq-connect-eventbridge/pom.xml | 10 +++++++--
.../eventbridge/sink/EventBridgeSinkConnector.java | 10 ++++-----
.../eventbridge/sink/EventBridgeSinkTask.java | 24 +++++++++++++---------
.../sink/constant/EventBridgeConstant.java | 2 +-
.../eventbridge/sink/EventBridgeSinkTest.java | 2 +-
6 files changed, 32 insertions(+), 22 deletions(-)
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index f6d9489..f22b5fe 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,13 +16,13 @@ mvn clean install -Dmaven.test.skip=true
```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"regionId”:"${regionId}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
```
例子
```
http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","regionId”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
```
@@ -39,7 +39,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
| KEY | TYPE | Must be filled | Description | Example
|------------------------|---------|----------------|----------------------------------|--|
-|regionId | String | YES | 地域 | cn-hangzhou|
+|stsEndpoint | String | YES | STS endpoint | xxxx|
|accessKeyId | String | YES | 阿里云授信账号的AK | xxxx |
|accessKeySecret | String | YES | 阿里云授信账号的SK | xxx |
|roleArn | String | YES | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色 | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
index 4730384..ba1875c 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -21,8 +21,9 @@
<commons-lang3.version>3.12.0</commons-lang3.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<eventbridge-client.version>1.3.5</eventbridge-client.version>
- <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
- <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
+ <aliyun-java-sdk-sts.version>2.2.0</aliyun-java-sdk-sts.version>
+ <aliyun-java-sdk-core.version>4.5.24</aliyun-java-sdk-core.version>
+ <aliyun-java-sdk-sts-internal.version>3.0.0</aliyun-java-sdk-sts-internal.version>
<gson.version>2.9.0</gson.version>
</properties>
@@ -207,6 +208,11 @@
<artifactId>aliyun-java-sdk-sts</artifactId>
<version>${aliyun-java-sdk-sts.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>aliyun-java-sdk-sts-internal</artifactId>
+ <version>${aliyun-java-sdk-sts-internal.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 967c50b..1a7a0b1 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -12,8 +12,6 @@ import java.util.List;
public class EventBridgeSinkConnector extends SinkConnector {
- private String regionId;
-
private String accessKeyId;
private String accessKeySecret;
@@ -30,6 +28,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
private String accountEndpoint;
+ private String stsEndpoint;
+
@Override
public void pause() {
@@ -44,7 +44,6 @@ public class EventBridgeSinkConnector extends SinkConnector {
public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> keyValueList = new ArrayList<>(11);
KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, regionId);
keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
@@ -53,6 +52,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
+ keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
keyValueList.add(keyValue);
return keyValueList;
}
@@ -71,7 +71,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
|| StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
|| StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
|| StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.REGION_ID_CONSTANT))) {
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.STS_ENDPOINT))) {
throw new RuntimeException("EventBridge required parameter is null !");
}
}
@@ -85,8 +85,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
- regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+ stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
}
@Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 28eba62..aaeac3a 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -7,9 +7,10 @@ import com.aliyun.eventbridge.models.PutEventsResponse;
import com.aliyun.eventbridge.util.EventBuilder;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
+import com.aliyuncs.http.FormatType;
import com.aliyuncs.profile.DefaultProfile;
-import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
-import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleWithServiceIdentityRequest;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleWithServiceIdentityResponse;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
@@ -27,12 +28,12 @@ import java.util.List;
public class EventBridgeSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
- private String regionId;
-
private String accessKeyId;
private String accessKeySecret;
+ private String stsEndpoint;
+
private String roleArn;
private String roleSessionName;
@@ -61,7 +62,7 @@ public class EventBridgeSinkTask extends SinkTask {
.withAliyunEventBus(aliyuneventbusname)
.build()));
PutEventsResponse putEventsResponse = eventBridgeClient.putEvents(cloudEventList);
- log.info("EventBridgeSinkTask | put | putEventsResponse | entryList : {} | requestId : {}", putEventsResponse.getEntryList(), putEventsResponse.getRequestId());
+ log.info("EventBridgeSinkTask | put | putEventsResponse | eventId : {} | traceId : {} | requestId : {}", putEventsResponse.getEntryList().get(0).getEventId(), putEventsResponse.getEntryList().get(0).getTraceId(), putEventsResponse.getRequestId());
} catch (Exception e) {
log.error("EventBridgeSinkTask | put | error => ", e);
throw new RuntimeException(e);
@@ -92,21 +93,24 @@ public class EventBridgeSinkTask extends SinkTask {
eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
- regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+ stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
}
@Override
public void start(SinkTaskContext sinkTaskContext) {
super.start(sinkTaskContext);
try {
- DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret);
+ DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
+ DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
IAcsClient client = new DefaultAcsClient(profile);
- AssumeRoleRequest request = new AssumeRoleRequest();
- request.setRegionId(regionId);
+ AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
request.setRoleArn(roleArn);
request.setRoleSessionName(roleSessionName);
- AssumeRoleResponse response = client.getAcsResponse(request);
+ request.setAssumeRoleFor(roleSessionName);
+ request.setAcceptFormat(FormatType.JSON);
+ request.setDurationSeconds(3600L);
+ final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
Config authConfig = new Config();
authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
index 9ae33d0..8c96842 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -13,5 +13,5 @@ public class EventBridgeConstant {
public static final String EVENT_TYPE = "type";
public static final String EVENT_SUBJECT = "eventSubject";
public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
- public static final String REGION_ID_CONSTANT = "regionId";
+ public static final String STS_ENDPOINT = "stsEndpoint";
}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 5add642..670fd87 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -29,8 +29,8 @@ public class EventBridgeSinkTest {
public void testPut() {
EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, "xxxx");
keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
+ keyValue.put(EventBridgeConstant.STS_ENDPOINT, "xxxx");
keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");