You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:19 UTC
[rocketmq-connect] 02/10: update eventbridge sts init
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 26ff79e93cf06dce11c913b90fb0ab0d96a24c8f
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Wed Apr 27 11:16:38 2022 +0800
update eventbridge sts init
---
.../aliyun/rocketmq-connect-eventbridge/README.md | 18 ++++-----
.../aliyun/rocketmq-connect-eventbridge/pom.xml | 18 +++++++++
.../eventbridge/sink/EventBridgeSinkConnector.java | 36 +++++++++---------
.../eventbridge/sink/EventBridgeSinkTask.java | 43 ++++++++++++++--------
.../sink/constant/EventBridgeConstant.java | 9 +++--
.../eventbridge/sink/EventBridgeSinkTest.java | 11 ++++--
6 files changed, 85 insertions(+), 50 deletions(-)
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index d4eca92..f6d9489 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,15 +16,14 @@ mvn clean install -Dmaven.test.skip=true
```
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}","eventId":"${eventId}","eventSource":"${eventSource}","eventType":"${eventType}",
-"eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"regionId”:"${regionId}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
```
例子
```
http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",accountEndpoint”:"xxxx","eventId":"xxxx",
-"eventSource":"xxxx","eventType":"", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","regionId”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
+roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
```
>**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -40,12 +39,11 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
| KEY | TYPE | Must be filled | Description | Example
|------------------------|---------|----------------|----------------------------------|--|
-|accessKeyId | String | YES | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxxx |
-|accessKeySecret | String | YES | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxx |
-|accountEndpoint | String | YES | 阿里云EventBridge官方接入点 | xxxx |
-|eventId | String | YES | 事件ID | xxxx |
-|eventSource | String | YES | 事件源 | xxxx |
-|eventType | String | YES | 事件类型 | null |
+|regionId | String | YES | 地域 | cn-hangzhou|
+|accessKeyId | String | YES | 阿里云授信账号的AK | xxxx |
+|accessKeySecret | String | YES | 阿里云授信账号的SK | xxx |
+|roleArn | String | YES | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色 | xxxx |
+|roleSessionName | String | YES | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份 | 用户名 |
|eventTime | String | YES | 事件产生的时间 | xxxx |
|eventSubject | String | NO | 事件主题 | xxxx |
|aliyuneventbusname | String | YES | 接收事件的事件总线名称 | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
index 469d1fe..4730384 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -21,6 +21,9 @@
<commons-lang3.version>3.12.0</commons-lang3.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<eventbridge-client.version>1.3.5</eventbridge-client.version>
+ <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
+ <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
+ <gson.version>2.9.0</gson.version>
</properties>
<build>
@@ -189,6 +192,21 @@
<artifactId>eventbridge-client</artifactId>
<version>${eventbridge-client.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>aliyun-java-sdk-core</artifactId>
+ <version>${aliyun-java-sdk-core.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>aliyun-java-sdk-sts</artifactId>
+ <version>${aliyun-java-sdk-sts.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index f986a0a..967c50b 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -12,24 +12,24 @@ import java.util.List;
public class EventBridgeSinkConnector extends SinkConnector {
+ private String regionId;
+
private String accessKeyId;
private String accessKeySecret;
- private String accountEndpoint;
-
- private String eventId;
+ private String roleArn;
- private String eventSource;
+ private String roleSessionName;
private String eventTime;
- private String eventType;
-
private String eventSubject;
private String aliyuneventbusname;
+ private String accountEndpoint;
+
@Override
public void pause() {
@@ -44,15 +44,15 @@ public class EventBridgeSinkConnector extends SinkConnector {
public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> keyValueList = new ArrayList<>(11);
KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, regionId);
keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
- keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
- keyValue.put(EventBridgeConstant.EVENT_ID, eventId);
- keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
+ keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
+ keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, roleSessionName);
keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
- keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
+ keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
keyValueList.add(keyValue);
return keyValueList;
}
@@ -67,11 +67,11 @@ public class EventBridgeSinkConnector extends SinkConnector {
if (StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_ID))
|| StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
|| StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_ID))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_ARN))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
|| StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
- || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))) {
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.REGION_ID_CONSTANT))) {
throw new RuntimeException("EventBridge required parameter is null !");
}
}
@@ -80,13 +80,13 @@ public class EventBridgeSinkConnector extends SinkConnector {
public void init(KeyValue config) {
accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
- accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
- eventId = config.getString(EventBridgeConstant.EVENT_ID);
- eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+ roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
+ roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
- eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+ regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
+ accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
}
@Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 3af4564..28eba62 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -5,6 +5,11 @@ import com.aliyun.eventbridge.models.CloudEvent;
import com.aliyun.eventbridge.models.Config;
import com.aliyun.eventbridge.models.PutEventsResponse;
import com.aliyun.eventbridge.util.EventBuilder;
+import com.aliyuncs.DefaultAcsClient;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.profile.DefaultProfile;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
@@ -22,24 +27,24 @@ import java.util.List;
public class EventBridgeSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
+ private String regionId;
+
private String accessKeyId;
private String accessKeySecret;
- private String accountEndpoint;
-
- private String eventId;
+ private String roleArn;
- private String eventSource;
+ private String roleSessionName;
private String eventTime;
- private String eventType;
-
private String eventSubject;
private String aliyuneventbusname;
+ private String accountEndpoint;
+
private EventBridgeClient eventBridgeClient;
@Override
@@ -47,9 +52,9 @@ public class EventBridgeSinkTask extends SinkTask {
List<CloudEvent> cloudEventList = new ArrayList<>();
try {
sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
- .withId(eventId)
- .withSource(URI.create(eventSource))
- .withType(eventType)
+ .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
+ .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
+ .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
.withSubject(eventSubject)
.withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
.withJsonStringData(connectRecord.getData().toString())
@@ -82,22 +87,30 @@ public class EventBridgeSinkTask extends SinkTask {
public void init(KeyValue config) {
accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
- accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
- eventId = config.getString(EventBridgeConstant.EVENT_ID);
- eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+ roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
+ roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
- eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+ regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
+ accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
}
@Override
public void start(SinkTaskContext sinkTaskContext) {
super.start(sinkTaskContext);
try {
+ DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret);
+ IAcsClient client = new DefaultAcsClient(profile);
+ AssumeRoleRequest request = new AssumeRoleRequest();
+ request.setRegionId(regionId);
+ request.setRoleArn(roleArn);
+ request.setRoleSessionName(roleSessionName);
+ AssumeRoleResponse response = client.getAcsResponse(request);
Config authConfig = new Config();
- authConfig.accessKeyId = accessKeyId;
- authConfig.accessKeySecret = accessKeySecret;
+ authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
+ authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
+ authConfig.securityToken = response.getCredentials().getSecurityToken();
authConfig.endpoint = accountEndpoint;
eventBridgeClient = new EventBridgeClient(authConfig);
} catch (Exception e) {
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
index 51a7508..9ae33d0 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -5,10 +5,13 @@ public class EventBridgeConstant {
public static final String ACCESS_KEY_ID = "accessKeyId";
public static final String ACCESS_KEY_SECRET = "accessKeySecret";
public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
- public static final String EVENT_ID = "eventId";
- public static final String EVENT_SOURCE = "eventSource";
+ public static final String ROLE_ARN = "roleArn";
+ public static final String ROLE_SESSION_NAME = "roleSessionName";
+ public static final String EVENT_ID = "id";
+ public static final String EVENT_SOURCE = "source";
public static final String EVENT_TIME = "eventTime";
- public static final String EVENT_TYPE = "eventType";
+ public static final String EVENT_TYPE = "type";
public static final String EVENT_SUBJECT = "eventSubject";
public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
+ public static final String REGION_ID_CONSTANT = "regionId";
}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 030794c..5add642 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -29,13 +29,13 @@ public class EventBridgeSinkTest {
public void testPut() {
EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, "xxxx");
keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
- keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
- keyValue.put(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
- keyValue.put(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+ keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
+ keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");
+ keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, "xxxx");
keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
- keyValue.put(EventBridgeConstant.EVENT_TYPE, "xxxx");
keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
eventBridgeSinkTask.init(keyValue);
@@ -44,6 +44,9 @@ public class EventBridgeSinkTest {
connectRecord.setData("{\n" +
"\t\"test\" : \"test\"\n" +
"}");
+ connectRecord.addExtension(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
+ connectRecord.addExtension(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+ connectRecord.addExtension(EventBridgeConstant.EVENT_TYPE, "xxxx");
connectRecordList.add(connectRecord);
eventBridgeSinkTask.start(new SinkTaskContext() {
@Override