You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:17 UTC

[rocketmq-connect] branch master updated (732f127 -> ffa4cdb)

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


    from 732f127  [ISSUE #147] Transformchain adds a close method to unload the objects in the transform when the connector is closed #147 (#149)
     new 8b8c1d1  add eventbridge sink
     new 26ff79e  update eventbridge sts init
     new 0229214  update eventbridge sts init
     new d516617  add eventType
     new 5228be0  update eventTime
     new 472c6e7  update pom version
     new c621ebb  update validate
     new 090bddc  update README.md
     new 650b169  fix "null" check
     new ffa4cdb  update README.md

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../aliyun/rocketmq-connect-eventbridge/README.md  |  51 ++++++++
 .../pom.xml                                        |  56 +++++----
 .../eventbridge/sink/EventBridgeSinkConnector.java |  89 ++++++++++++++
 .../eventbridge/sink/EventBridgeSinkTask.java      | 135 +++++++++++++++++++++
 .../sink/constant/EventBridgeConstant.java         |  17 +++
 .../connect/eventbridge/sink/utils/CheckUtils.java |  25 ++++
 .../connect/eventbridge/sink/utils/DateUtils.java  |  23 ++++
 .../eventbridge/sink/EventBridgeSinkTest.java      |  90 ++++++++++++++
 8 files changed, 462 insertions(+), 24 deletions(-)
 create mode 100644 connectors/aliyun/rocketmq-connect-eventbridge/README.md
 copy connectors/aliyun/{rocketmq-connect-fc => rocketmq-connect-eventbridge}/pom.xml (82%)
 create mode 100644 connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
 create mode 100644 connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
 create mode 100644 connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
 create mode 100644 connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/CheckUtils.java
 create mode 100644 connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
 create mode 100644 connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java


[rocketmq-connect] 06/10: update pom version

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 472c6e72d5e653cfacfc55ba2e5f3d726e84a8a4
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Tue May 31 11:22:28 2022 +0800

    update pom version
---
 connectors/aliyun/rocketmq-connect-eventbridge/pom.xml | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
index ba1875c..57084ce 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -16,13 +16,13 @@
         <logback.version>1.2.11</logback.version>
         <junit.version>4.13.2</junit.version>
         <assertj.version>3.22.0</assertj.version>
-        <mockito.version>4.4.0</mockito.version>
+        <mockito.version>4.5.1</mockito.version>
         <openmessaging-connector.version>0.1.2</openmessaging-connector.version>
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <eventbridge-client.version>1.3.5</eventbridge-client.version>
-        <aliyun-java-sdk-sts.version>2.2.0</aliyun-java-sdk-sts.version>
-        <aliyun-java-sdk-core.version>4.5.24</aliyun-java-sdk-core.version>
+        <eventbridge-client.version>1.3.6</eventbridge-client.version>
+        <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
+        <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
         <aliyun-java-sdk-sts-internal.version>3.0.0</aliyun-java-sdk-sts-internal.version>
         <gson.version>2.9.0</gson.version>
     </properties>


[rocketmq-connect] 02/10: update eventbridge sts init

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 26ff79e93cf06dce11c913b90fb0ab0d96a24c8f
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Wed Apr 27 11:16:38 2022 +0800

    update eventbridge sts init
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  | 18 ++++-----
 .../aliyun/rocketmq-connect-eventbridge/pom.xml    | 18 +++++++++
 .../eventbridge/sink/EventBridgeSinkConnector.java | 36 +++++++++---------
 .../eventbridge/sink/EventBridgeSinkTask.java      | 43 ++++++++++++++--------
 .../sink/constant/EventBridgeConstant.java         |  9 +++--
 .../eventbridge/sink/EventBridgeSinkTest.java      | 11 ++++--
 6 files changed, 85 insertions(+), 50 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index d4eca92..f6d9489 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,15 +16,14 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}","eventId":"${eventId}","eventSource":"${eventSource}","eventType":"${eventType}", 
-"eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"regionId”:"${regionId}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",accountEndpoint”:"xxxx","eventId":"xxxx",
-"eventSource":"xxxx","eventType":"", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","regionId”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
+roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
 ```
 
 >**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -40,12 +39,11 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 
 |         KEY            |  TYPE   | Must be filled | Description                      | Example
 |------------------------|---------|----------------|----------------------------------|--|
-|accessKeyId             | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                    | xxxx |
-|accessKeySecret         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                     | xxx |
-|accountEndpoint         | String  | YES            | 阿里云EventBridge官方接入点                     | xxxx |
-|eventId                 | String  | YES            | 事件ID | xxxx |
-|eventSource             | String  | YES            | 事件源 | xxxx |
-|eventType               | String | YES             | 事件类型                           | null |
+|regionId                | String  | YES            | 地域                               | cn-hangzhou|
+|accessKeyId             | String  | YES            | 阿里云授信账号的AK                    | xxxx |
+|accessKeySecret         | String  | YES            | 阿里云授信账号的SK                     | xxx |
+|roleArn                 | String  | YES            | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
+|roleSessionName         | String  | YES            | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
 |eventTime               | String | YES             | 事件产生的时间                          | xxxx |
 |eventSubject            | String | NO              | 事件主题                          | xxxx |
 |aliyuneventbusname      | String | YES             | 接收事件的事件总线名称                          | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
index 469d1fe..4730384 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -21,6 +21,9 @@
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <eventbridge-client.version>1.3.5</eventbridge-client.version>
+        <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
+        <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
+        <gson.version>2.9.0</gson.version>
     </properties>
 
     <build>
@@ -189,6 +192,21 @@
             <artifactId>eventbridge-client</artifactId>
             <version>${eventbridge-client.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-core</artifactId>
+            <version>${aliyun-java-sdk-core.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-sts</artifactId>
+            <version>${aliyun-java-sdk-sts.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index f986a0a..967c50b 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -12,24 +12,24 @@ import java.util.List;
 
 public class EventBridgeSinkConnector extends SinkConnector {
 
+    private String regionId;
+
     private String accessKeyId;
 
     private String accessKeySecret;
 
-    private String accountEndpoint;
-
-    private String eventId;
+    private String roleArn;
 
-    private String eventSource;
+    private String roleSessionName;
 
     private String eventTime;
 
-    private String eventType;
-
     private String eventSubject;
 
     private String aliyuneventbusname;
 
+    private String accountEndpoint;
+
     @Override
     public void pause() {
 
@@ -44,15 +44,15 @@ public class EventBridgeSinkConnector extends SinkConnector {
     public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> keyValueList = new ArrayList<>(11);
         KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, regionId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
-        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
-        keyValue.put(EventBridgeConstant.EVENT_ID, eventId);
-        keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
+        keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
+        keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, roleSessionName);
         keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
-        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
+        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -67,11 +67,11 @@ public class EventBridgeSinkConnector extends SinkConnector {
         if (StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_ID))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_ID))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_ARN))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.REGION_ID_CONSTANT))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }
@@ -80,13 +80,13 @@ public class EventBridgeSinkConnector extends SinkConnector {
     public void init(KeyValue config) {
         accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
-        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
-        eventId = config.getString(EventBridgeConstant.EVENT_ID);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
+        roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 3af4564..28eba62 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -5,6 +5,11 @@ import com.aliyun.eventbridge.models.CloudEvent;
 import com.aliyun.eventbridge.models.Config;
 import com.aliyun.eventbridge.models.PutEventsResponse;
 import com.aliyun.eventbridge.util.EventBuilder;
+import com.aliyuncs.DefaultAcsClient;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.profile.DefaultProfile;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
@@ -22,24 +27,24 @@ import java.util.List;
 public class EventBridgeSinkTask extends SinkTask {
     private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
 
+    private String regionId;
+
     private String accessKeyId;
 
     private String accessKeySecret;
 
-    private String accountEndpoint;
-
-    private String eventId;
+    private String roleArn;
 
-    private String eventSource;
+    private String roleSessionName;
 
     private String eventTime;
 
-    private String eventType;
-
     private String eventSubject;
 
     private String aliyuneventbusname;
 
+    private String accountEndpoint;
+
     private EventBridgeClient eventBridgeClient;
 
     @Override
@@ -47,9 +52,9 @@ public class EventBridgeSinkTask extends SinkTask {
         List<CloudEvent> cloudEventList = new ArrayList<>();
         try {
             sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
-                    .withId(eventId)
-                    .withSource(URI.create(eventSource))
-                    .withType(eventType)
+                    .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
+                    .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
+                    .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
                     .withSubject(eventSubject)
                     .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
                     .withJsonStringData(connectRecord.getData().toString())
@@ -82,22 +87,30 @@ public class EventBridgeSinkTask extends SinkTask {
     public void init(KeyValue config) {
         accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
-        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
-        eventId = config.getString(EventBridgeConstant.EVENT_ID);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
+        roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
     }
 
     @Override
     public void start(SinkTaskContext sinkTaskContext) {
         super.start(sinkTaskContext);
         try {
+            DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret);
+            IAcsClient client = new DefaultAcsClient(profile);
+            AssumeRoleRequest request = new AssumeRoleRequest();
+            request.setRegionId(regionId);
+            request.setRoleArn(roleArn);
+            request.setRoleSessionName(roleSessionName);
+            AssumeRoleResponse response = client.getAcsResponse(request);
             Config authConfig = new Config();
-            authConfig.accessKeyId = accessKeyId;
-            authConfig.accessKeySecret = accessKeySecret;
+            authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
+            authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
+            authConfig.securityToken = response.getCredentials().getSecurityToken();
             authConfig.endpoint = accountEndpoint;
             eventBridgeClient = new EventBridgeClient(authConfig);
         } catch (Exception e) {
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
index 51a7508..9ae33d0 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -5,10 +5,13 @@ public class EventBridgeConstant {
     public static final String ACCESS_KEY_ID = "accessKeyId";
     public static final String ACCESS_KEY_SECRET = "accessKeySecret";
     public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
-    public static final String EVENT_ID = "eventId";
-    public static final String EVENT_SOURCE = "eventSource";
+    public static final String ROLE_ARN = "roleArn";
+    public static final String ROLE_SESSION_NAME = "roleSessionName";
+    public static final String EVENT_ID = "id";
+    public static final String EVENT_SOURCE = "source";
     public static final String EVENT_TIME = "eventTime";
-    public static final String EVENT_TYPE = "eventType";
+    public static final String EVENT_TYPE = "type";
     public static final String EVENT_SUBJECT = "eventSubject";
     public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
+    public static final String REGION_ID_CONSTANT = "regionId";
 }
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 030794c..5add642 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -29,13 +29,13 @@ public class EventBridgeSinkTest {
     public void testPut() {
         EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
         KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
-        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
-        keyValue.put(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
-        keyValue.put(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");
+        keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, "xxxx");
         keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
-        keyValue.put(EventBridgeConstant.EVENT_TYPE, "xxxx");
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
         eventBridgeSinkTask.init(keyValue);
@@ -44,6 +44,9 @@ public class EventBridgeSinkTest {
         connectRecord.setData("{\n" +
                 "\t\"test\" :  \"test\"\n" +
                 "}");
+        connectRecord.addExtension(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
+        connectRecord.addExtension(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+        connectRecord.addExtension(EventBridgeConstant.EVENT_TYPE, "xxxx");
         connectRecordList.add(connectRecord);
         eventBridgeSinkTask.start(new SinkTaskContext() {
             @Override


[rocketmq-connect] 03/10: update eventbridge sts init

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 02292146999b1f47d49553c91a3cb0b120a14ea8
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Mon May 9 14:23:28 2022 +0800

    update eventbridge sts init
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  |  6 +++---
 .../aliyun/rocketmq-connect-eventbridge/pom.xml    | 10 +++++++--
 .../eventbridge/sink/EventBridgeSinkConnector.java | 10 ++++-----
 .../eventbridge/sink/EventBridgeSinkTask.java      | 24 +++++++++++++---------
 .../sink/constant/EventBridgeConstant.java         |  2 +-
 .../eventbridge/sink/EventBridgeSinkTest.java      |  2 +-
 6 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index f6d9489..f22b5fe 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,13 +16,13 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"regionId”:"${regionId}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
-"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","regionId”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
 roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
 ```
 
@@ -39,7 +39,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 
 |         KEY            |  TYPE   | Must be filled | Description                      | Example
 |------------------------|---------|----------------|----------------------------------|--|
-|regionId                | String  | YES            | 地域                               | cn-hangzhou|
+|stsEndpoint             | String  | YES            | STS endpoint                       | xxxx|
 |accessKeyId             | String  | YES            | 阿里云授信账号的AK                    | xxxx |
 |accessKeySecret         | String  | YES            | 阿里云授信账号的SK                     | xxx |
 |roleArn                 | String  | YES            | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
index 4730384..ba1875c 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -21,8 +21,9 @@
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <eventbridge-client.version>1.3.5</eventbridge-client.version>
-        <aliyun-java-sdk-sts.version>3.1.0</aliyun-java-sdk-sts.version>
-        <aliyun-java-sdk-core.version>4.6.0</aliyun-java-sdk-core.version>
+        <aliyun-java-sdk-sts.version>2.2.0</aliyun-java-sdk-sts.version>
+        <aliyun-java-sdk-core.version>4.5.24</aliyun-java-sdk-core.version>
+        <aliyun-java-sdk-sts-internal.version>3.0.0</aliyun-java-sdk-sts-internal.version>
         <gson.version>2.9.0</gson.version>
     </properties>
 
@@ -207,6 +208,11 @@
             <artifactId>aliyun-java-sdk-sts</artifactId>
             <version>${aliyun-java-sdk-sts.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-sts-internal</artifactId>
+            <version>${aliyun-java-sdk-sts-internal.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 967c50b..1a7a0b1 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -12,8 +12,6 @@ import java.util.List;
 
 public class EventBridgeSinkConnector extends SinkConnector {
 
-    private String regionId;
-
     private String accessKeyId;
 
     private String accessKeySecret;
@@ -30,6 +28,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
 
     private String accountEndpoint;
 
+    private String stsEndpoint;
+
     @Override
     public void pause() {
 
@@ -44,7 +44,6 @@ public class EventBridgeSinkConnector extends SinkConnector {
     public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> keyValueList = new ArrayList<>(11);
         KeyValue keyValue = new DefaultKeyValue();
-        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, regionId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
         keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
@@ -53,6 +52,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
+        keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -71,7 +71,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.REGION_ID_CONSTANT))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.STS_ENDPOINT))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }
@@ -85,8 +85,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
-        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 28eba62..aaeac3a 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -7,9 +7,10 @@ import com.aliyun.eventbridge.models.PutEventsResponse;
 import com.aliyun.eventbridge.util.EventBuilder;
 import com.aliyuncs.DefaultAcsClient;
 import com.aliyuncs.IAcsClient;
+import com.aliyuncs.http.FormatType;
 import com.aliyuncs.profile.DefaultProfile;
-import com.aliyuncs.sts.model.v20150401.AssumeRoleRequest;
-import com.aliyuncs.sts.model.v20150401.AssumeRoleResponse;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleWithServiceIdentityRequest;
+import com.aliyuncs.sts.model.v20150401.AssumeRoleWithServiceIdentityResponse;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
@@ -27,12 +28,12 @@ import java.util.List;
 public class EventBridgeSinkTask extends SinkTask {
     private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
 
-    private String regionId;
-
     private String accessKeyId;
 
     private String accessKeySecret;
 
+    private String stsEndpoint;
+
     private String roleArn;
 
     private String roleSessionName;
@@ -61,7 +62,7 @@ public class EventBridgeSinkTask extends SinkTask {
                     .withAliyunEventBus(aliyuneventbusname)
                     .build()));
             PutEventsResponse putEventsResponse = eventBridgeClient.putEvents(cloudEventList);
-            log.info("EventBridgeSinkTask | put | putEventsResponse | entryList : {} | requestId : {}", putEventsResponse.getEntryList(), putEventsResponse.getRequestId());
+            log.info("EventBridgeSinkTask | put | putEventsResponse | eventId : {} | traceId : {} | requestId : {}", putEventsResponse.getEntryList().get(0).getEventId(), putEventsResponse.getEntryList().get(0).getTraceId(), putEventsResponse.getRequestId());
         } catch (Exception e) {
             log.error("EventBridgeSinkTask | put | error => ", e);
             throw new RuntimeException(e);
@@ -92,21 +93,24 @@ public class EventBridgeSinkTask extends SinkTask {
         eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
-        regionId = config.getString(EventBridgeConstant.REGION_ID_CONSTANT);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
     }
 
     @Override
     public void start(SinkTaskContext sinkTaskContext) {
         super.start(sinkTaskContext);
         try {
-            DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKeyId, accessKeySecret);
+            DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
+            DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
             IAcsClient client = new DefaultAcsClient(profile);
-            AssumeRoleRequest request = new AssumeRoleRequest();
-            request.setRegionId(regionId);
+            AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
             request.setRoleArn(roleArn);
             request.setRoleSessionName(roleSessionName);
-            AssumeRoleResponse response = client.getAcsResponse(request);
+            request.setAssumeRoleFor(roleSessionName);
+            request.setAcceptFormat(FormatType.JSON);
+            request.setDurationSeconds(3600L);
+            final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
             Config authConfig = new Config();
             authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
             authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
index 9ae33d0..8c96842 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -13,5 +13,5 @@ public class EventBridgeConstant {
     public static final String EVENT_TYPE = "type";
     public static final String EVENT_SUBJECT = "eventSubject";
     public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
-    public static final String REGION_ID_CONSTANT = "regionId";
+    public static final String STS_ENDPOINT = "stsEndpoint";
 }
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 5add642..670fd87 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -29,8 +29,8 @@ public class EventBridgeSinkTest {
     public void testPut() {
         EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
         KeyValue keyValue = new DefaultKeyValue();
-        keyValue.put(EventBridgeConstant.REGION_ID_CONSTANT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(EventBridgeConstant.STS_ENDPOINT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
         keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");


[rocketmq-connect] 01/10: add eventbridge sink

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 8b8c1d1476868db9d7e45f76e8e07402567194a0
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Sun Apr 24 16:50:34 2022 +0800

    add eventbridge sink
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  |  53 ++++++
 .../aliyun/rocketmq-connect-eventbridge/pom.xml    | 194 +++++++++++++++++++++
 .../eventbridge/sink/EventBridgeSinkConnector.java |  96 ++++++++++
 .../eventbridge/sink/EventBridgeSinkTask.java      | 114 ++++++++++++
 .../sink/constant/EventBridgeConstant.java         |  14 ++
 .../connect/eventbridge/sink/utils/DateUtils.java  |  16 ++
 .../eventbridge/sink/EventBridgeSinkTest.java      |  87 +++++++++
 7 files changed, 574 insertions(+)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
new file mode 100644
index 0000000..d4eca92
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -0,0 +1,53 @@
+# rocketmq-connect-eventbridge
+* **rocketmq-connect-eventbridge** 说明
+```
+Be responsible for consuming messages from producer and writing data to eventbridge.
+```
+
+## rocketmq-connect-eventbridge 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-eventbridge 启动
+
+* **eventbridge-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
+,accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}","eventId":"${eventId}","eventSource":"${eventSource}","eventType":"${eventType}", 
+"eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",accountEndpoint”:"xxxx","eventId":"xxxx",
+"eventSource":"xxxx","eventType":"", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+```
+
+>**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-eventbridge 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector-name}/stop
+```
+
+## rocketmq-connect-eventbridge 参数说明
+* **eventbridge-sink-connector 参数说明**
+
+|         KEY            |  TYPE   | Must be filled | Description                      | Example
+|------------------------|---------|----------------|----------------------------------|--|
+|accessKeyId             | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                    | xxxx |
+|accessKeySecret         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                     | xxx |
+|accountEndpoint         | String  | YES            | 阿里云EventBridge官方接入点                     | xxxx |
+|eventId                 | String  | YES            | 事件ID | xxxx |
+|eventSource             | String  | YES            | 事件源 | xxxx |
+|eventType               | String | YES             | 事件类型                           | null |
+|eventTime               | String | YES             | 事件产生的时间                          | xxxx |
+|eventSubject            | String | NO              | 事件主题                          | xxxx |
+|aliyuneventbusname      | String | YES             | 接收事件的事件总线名称                          | xxxx |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
+
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
new file mode 100644
index 0000000..469d1fe
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-eventbridge</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-eventbridge</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <slf4j.version>1.7.36</slf4j.version>
+        <logback.version>1.2.11</logback.version>
+        <junit.version>4.13.2</junit.version>
+        <assertj.version>3.22.0</assertj.version>
+        <mockito.version>4.4.0</mockito.version>
+        <openmessaging-connector.version>0.1.2</openmessaging-connector.version>
+        <commons-lang3.version>3.12.0</commons-lang3.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <eventbridge-client.version>1.3.5</eventbridge-client.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>eventbridge-client</artifactId>
+            <version>${eventbridge-client.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
new file mode 100644
index 0000000..f986a0a
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -0,0 +1,96 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventBridgeSinkConnector extends SinkConnector {
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String eventId;
+
+    private String eventSource;
+
+    private String eventTime;
+
+    private String eventType;
+
+    private String eventSubject;
+
+    private String aliyuneventbusname;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValueList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
+        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
+        keyValue.put(EventBridgeConstant.EVENT_ID, eventId);
+        keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
+        keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
+        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
+        keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
+        keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
+        keyValueList.add(keyValue);
+        return keyValueList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return EventBridgeSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_ID))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_ID))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))) {
+            throw new RuntimeException("EventBridge required parameter is null !");
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        eventId = config.getString(EventBridgeConstant.EVENT_ID);
+        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+        eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
+        aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
new file mode 100644
index 0000000..3af4564
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -0,0 +1,114 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import com.aliyun.eventbridge.EventBridgeClient;
+import com.aliyun.eventbridge.models.CloudEvent;
+import com.aliyun.eventbridge.models.Config;
+import com.aliyun.eventbridge.models.PutEventsResponse;
+import com.aliyun.eventbridge.util.EventBuilder;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventBridgeSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String eventId;
+
+    private String eventSource;
+
+    private String eventTime;
+
+    private String eventType;
+
+    private String eventSubject;
+
+    private String aliyuneventbusname;
+
+    private EventBridgeClient eventBridgeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        List<CloudEvent> cloudEventList = new ArrayList<>();
+        try {
+            sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
+                    .withId(eventId)
+                    .withSource(URI.create(eventSource))
+                    .withType(eventType)
+                    .withSubject(eventSubject)
+                    .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
+                    .withJsonStringData(connectRecord.getData().toString())
+                    .withAliyunEventBus(aliyuneventbusname)
+                    .build()));
+            PutEventsResponse putEventsResponse = eventBridgeClient.putEvents(cloudEventList);
+            log.info("EventBridgeSinkTask | put | putEventsResponse | entryList : {} | requestId : {}", putEventsResponse.getEntryList(), putEventsResponse.getRequestId());
+        } catch (Exception e) {
+            log.error("EventBridgeSinkTask | put | error => ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        eventId = config.getString(EventBridgeConstant.EVENT_ID);
+        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+        eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
+        aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+    }
+
+    @Override
+    public void start(SinkTaskContext sinkTaskContext) {
+        super.start(sinkTaskContext);
+        try {
+            Config authConfig = new Config();
+            authConfig.accessKeyId = accessKeyId;
+            authConfig.accessKeySecret = accessKeySecret;
+            authConfig.endpoint = accountEndpoint;
+            eventBridgeClient = new EventBridgeClient(authConfig);
+        } catch (Exception e) {
+            log.error("EventBridgeSinkTask | start | error => ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        eventBridgeClient = null;
+    }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
new file mode 100644
index 0000000..51a7508
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -0,0 +1,14 @@
+package org.apache.rocketmq.connect.eventbridge.sink.constant;
+
+public class EventBridgeConstant {
+
+    public static final String ACCESS_KEY_ID = "accessKeyId";
+    public static final String ACCESS_KEY_SECRET = "accessKeySecret";
+    public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
+    public static final String EVENT_ID = "eventId";
+    public static final String EVENT_SOURCE = "eventSource";
+    public static final String EVENT_TIME = "eventTime";
+    public static final String EVENT_TYPE = "eventType";
+    public static final String EVENT_SUBJECT = "eventSubject";
+    public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
new file mode 100644
index 0000000..c361653
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
@@ -0,0 +1,16 @@
+package org.apache.rocketmq.connect.eventbridge.sink.utils;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+public class DateUtils {
+    public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    public static Date getDate(String date, String dateFormat) {
+        final LocalDateTime parse = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(dateFormat));
+        return Date.from(parse.atZone(ZoneId.systemDefault()).toInstant());
+    }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
new file mode 100644
index 0000000..030794c
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -0,0 +1,87 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+
+public class EventBridgeSinkTest {
+
+    @Test
+    public void testTaskConfigs() {
+        EventBridgeSinkConnector eventBridgeSinkConnector = new EventBridgeSinkConnector();
+        Assert.assertEquals(eventBridgeSinkConnector.taskConfigs(1).size(), 1);
+    }
+
+    @Test
+    public void testPut() {
+        EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
+        keyValue.put(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
+        keyValue.put(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+        keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
+        keyValue.put(EventBridgeConstant.EVENT_TYPE, "xxxx");
+        keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
+        keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
+        eventBridgeSinkTask.init(keyValue);
+        List<ConnectRecord> connectRecordList = new ArrayList<>();
+        ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
+        connectRecord.setData("{\n" +
+                "\t\"test\" :  \"test\"\n" +
+                "}");
+        connectRecordList.add(connectRecord);
+        eventBridgeSinkTask.start(new SinkTaskContext() {
+            @Override
+            public String getConnectorName() {
+                return null;
+            }
+
+            @Override
+            public String getTaskName() {
+                return null;
+            }
+
+            @Override
+            public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+
+            }
+
+            @Override
+            public void resetOffset(Map<RecordPartition, RecordOffset> map) {
+
+            }
+
+            @Override
+            public void pause(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public void resume(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public Set<RecordPartition> assignment() {
+                return null;
+            }
+        });
+        eventBridgeSinkTask.put(connectRecordList);
+    }
+
+}


[rocketmq-connect] 10/10: update README.md

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit ffa4cdb416016102c9367ff540381d3cd265d352
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Thu Jun 2 15:21:58 2022 +0800

    update README.md
---
 .../aliyun/rocketmq-connect-eventbridge/README.md       |  7 ++-----
 .../eventbridge/sink/EventBridgeSinkConnector.java      | 17 +----------------
 .../connect/eventbridge/sink/EventBridgeSinkTask.java   | 16 ++++------------
 .../connect/eventbridge/sink/utils/DateUtils.java       |  6 ------
 .../connect/eventbridge/sink/EventBridgeSinkTest.java   |  2 +-
 5 files changed, 8 insertions(+), 40 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index 4d3b966..c9a5251 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,14 +16,14 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","eventSource":"${eventSource}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
 "connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
-roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","eventSource":"","aliyuneventbusname":"xxxx"}
+roleArn”:"xxxx", "roleSessionName":"xxxx", "aliyuneventbusname":"xxxx","eventSubject":""}
 ```
 
 >**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -45,10 +45,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 |accessKeySecret         | String  | YES             | 阿里云授信账号的SK                     | xxx |
 |roleArn                 | String  | NO             | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
 |roleSessionName         | String  | NO             | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
-|eventTime               | String  | NO              | 事件产生的时间                          | xxxx |
 |eventSubject            | String  | YES             | 事件主题                          | xxxx |
-|eventType               | String  | YES             | 事件类型                          | xxxx |
-|eventSource             | String  | YES             | 事件源                          | xxxx |
 |aliyuneventbusname      | String  | YES             | 接收事件的事件总线名称                          | xxxx |
 |connect-topicname       | String  | YES             | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 0f4af2f..57cdbb7 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -6,7 +6,6 @@ import io.openmessaging.connector.api.component.task.sink.SinkConnector;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
-import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -21,14 +20,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
 
     private String roleSessionName;
 
-    private String eventTime;
-
-    private String eventSource;
-
     private String eventSubject;
 
-    private String eventType;
-
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -53,13 +46,10 @@ public class EventBridgeSinkConnector extends SinkConnector {
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
         keyValue.put(EventBridgeConstant.ROLE_ARN, roleArn);
         keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, roleSessionName);
-        keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
         keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
-        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
-        keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -75,9 +65,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }
@@ -88,13 +76,10 @@ public class EventBridgeSinkConnector extends SinkConnector {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 95880b2..5c58473 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkTask extends SinkTask {
@@ -39,14 +40,8 @@ public class EventBridgeSinkTask extends SinkTask {
 
     private String roleSessionName;
 
-    private String eventTime;
-
     private String eventSubject;
 
-    private String eventType;
-
-    private String eventSource;
-
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -59,10 +54,10 @@ public class EventBridgeSinkTask extends SinkTask {
         try {
             sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
                     .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
-                    .withSource(CheckUtils.checkNull(eventSource) ? URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)) : URI.create(eventSource))
-                    .withType(CheckUtils.checkNull(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
+                    .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
+                    .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
                     .withSubject(eventSubject)
-                    .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
+                    .withTime(CheckUtils.checkNull(connectRecord.getExtension(EventBridgeConstant.EVENT_TIME)) ? new Date() : DateUtils.getDate(connectRecord.getExtension(EventBridgeConstant.EVENT_TIME), DateUtils.DEFAULT_DATE_FORMAT))
                     .withJsonStringData(connectRecord.getData().toString())
                     .withAliyunEventBus(aliyuneventbusname)
                     .build()));
@@ -95,13 +90,10 @@ public class EventBridgeSinkTask extends SinkTask {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
-        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
-        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
index 380f14e..b90911b 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
@@ -3,7 +3,6 @@ package org.apache.rocketmq.connect.eventbridge.sink.utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
@@ -21,9 +20,4 @@ public class DateUtils {
         return null;
     }
 
-    public static String getDate() {
-        DateFormat dateFormat = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
-        return dateFormat.format(new Date());
-    }
-
 }
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
index 670fd87..3561b4a 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -35,7 +35,6 @@ public class EventBridgeSinkTest {
         keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
         keyValue.put(EventBridgeConstant.ROLE_ARN, "xxxx");
         keyValue.put(EventBridgeConstant.ROLE_SESSION_NAME, "xxxx");
-        keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
         keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
         eventBridgeSinkTask.init(keyValue);
@@ -47,6 +46,7 @@ public class EventBridgeSinkTest {
         connectRecord.addExtension(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
         connectRecord.addExtension(EventBridgeConstant.EVENT_SOURCE, "xxxx");
         connectRecord.addExtension(EventBridgeConstant.EVENT_TYPE, "xxxx");
+        connectRecord.addExtension(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
         connectRecordList.add(connectRecord);
         eventBridgeSinkTask.start(new SinkTaskContext() {
             @Override


[rocketmq-connect] 09/10: fix "null" check

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 650b169db61afa4c5629b97b30098d51be23f646
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Thu Jun 2 11:23:27 2022 +0800

    fix "null" check
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  |  5 +++--
 .../eventbridge/sink/EventBridgeSinkConnector.java |  7 +++++-
 .../eventbridge/sink/EventBridgeSinkTask.java      | 11 ++++++----
 .../sink/constant/EventBridgeConstant.java         |  4 ++--
 .../connect/eventbridge/sink/utils/CheckUtils.java | 25 ++++++++++++++++++++++
 5 files changed, 43 insertions(+), 9 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index 6aec62f..4d3b966 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,14 +16,14 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","eventSource":"${eventSource}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
 "connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
-roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","aliyuneventbusname":"xxxx"}
+roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","eventSource":"","aliyuneventbusname":"xxxx"}
 ```
 
 >**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -48,6 +48,7 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 |eventTime               | String  | NO              | 事件产生的时间                          | xxxx |
 |eventSubject            | String  | YES             | 事件主题                          | xxxx |
 |eventType               | String  | YES             | 事件类型                          | xxxx |
+|eventSource             | String  | YES             | 事件源                          | xxxx |
 |aliyuneventbusname      | String  | YES             | 接收事件的事件总线名称                          | xxxx |
 |connect-topicname       | String  | YES             | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 254567a..0f4af2f 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -23,6 +23,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
 
     private String eventTime;
 
+    private String eventSource;
+
     private String eventSubject;
 
     private String eventType;
@@ -57,6 +59,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
         keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
         keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
+        keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -73,7 +76,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }
@@ -90,6 +94,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
         eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index 4a4fc74..95880b2 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -16,8 +16,8 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.apache.rocketmq.connect.eventbridge.sink.utils.CheckUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +45,8 @@ public class EventBridgeSinkTask extends SinkTask {
 
     private String eventType;
 
+    private String eventSource;
+
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -57,8 +59,8 @@ public class EventBridgeSinkTask extends SinkTask {
         try {
             sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
                     .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
-                    .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
-                    .withType(StringUtils.isBlank(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
+                    .withSource(CheckUtils.checkNull(eventSource) ? URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)) : URI.create(eventSource))
+                    .withType(CheckUtils.checkNull(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
                     .withSubject(eventSubject)
                     .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
                     .withJsonStringData(connectRecord.getData().toString())
@@ -99,6 +101,7 @@ public class EventBridgeSinkTask extends SinkTask {
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
         eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
     }
 
     @Override
@@ -106,7 +109,7 @@ public class EventBridgeSinkTask extends SinkTask {
         super.start(sinkTaskContext);
         try {
             Config authConfig = new Config();
-            if (StringUtils.isNotBlank(roleArn) && StringUtils.isNotBlank(roleSessionName)) {
+            if (CheckUtils.checkNotNull(roleArn) && CheckUtils.checkNotNull(roleSessionName)) {
                 DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
                 DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
                 IAcsClient client = new DefaultAcsClient(profile);
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
index 8c96842..2d65eb9 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -8,9 +8,9 @@ public class EventBridgeConstant {
     public static final String ROLE_ARN = "roleArn";
     public static final String ROLE_SESSION_NAME = "roleSessionName";
     public static final String EVENT_ID = "id";
-    public static final String EVENT_SOURCE = "source";
+    public static final String EVENT_SOURCE = "eventSource";
     public static final String EVENT_TIME = "eventTime";
-    public static final String EVENT_TYPE = "type";
+    public static final String EVENT_TYPE = "eventType";
     public static final String EVENT_SUBJECT = "eventSubject";
     public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
     public static final String STS_ENDPOINT = "stsEndpoint";
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/CheckUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/CheckUtils.java
new file mode 100644
index 0000000..067cca4
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/CheckUtils.java
@@ -0,0 +1,25 @@
+package org.apache.rocketmq.connect.eventbridge.sink.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class CheckUtils {
+    private static final String NULL_CONSTANT = "null";
+
+    public static Boolean checkNull(String check) {
+        if (StringUtils.isBlank(check)) {
+            return Boolean.TRUE;
+        }
+        if (StringUtils.isNotBlank(check) && NULL_CONSTANT.equalsIgnoreCase(check)) {
+            return Boolean.TRUE;
+        }
+        return Boolean.FALSE;
+    }
+
+    public static Boolean checkNotNull(String check) {
+        if (StringUtils.isNotBlank(check) && !NULL_CONSTANT.equalsIgnoreCase(check)) {
+            return Boolean.TRUE;
+        }
+        return Boolean.FALSE;
+    }
+
+}


[rocketmq-connect] 05/10: update eventTime

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 5228be05556811c2e80c8b64cd5174066efce98e
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Tue May 31 11:18:55 2022 +0800

    update eventTime
---
 .../eventbridge/sink/EventBridgeSinkConnector.java |  4 ++--
 .../eventbridge/sink/EventBridgeSinkTask.java      |  3 +--
 .../connect/eventbridge/sink/utils/DateUtils.java  | 25 ++++++++++++++++------
 3 files changed, 22 insertions(+), 10 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index cc0d0a5..7a22997 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -6,9 +6,9 @@ import io.openmessaging.connector.api.component.task.sink.SinkConnector;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkConnector extends SinkConnector {
@@ -86,7 +86,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index c1ce188..4a4fc74 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkTask extends SinkTask {
@@ -94,7 +93,7 @@ public class EventBridgeSinkTask extends SinkTask {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, DateUtils.getDate());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
index c361653..380f14e 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
@@ -1,16 +1,29 @@
 package org.apache.rocketmq.connect.eventbridge.sink.utils;
 
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.Date;
 
 public class DateUtils {
+    private static final Logger log = LoggerFactory.getLogger(DateUtils.class);
     public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
 
-    public static Date getDate(String date, String dateFormat) {
-        final LocalDateTime parse = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(dateFormat));
-        return Date.from(parse.atZone(ZoneId.systemDefault()).toInstant());
+    public static synchronized Date getDate(String date, String dateFormat) {
+        try {
+            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(dateFormat);
+            return simpleDateFormat.parse(date);
+        } catch (Exception e) {
+            log.error("DateUtils | getDate | error => ", e);
+        }
+        return null;
+    }
+
+    public static String getDate() {
+        DateFormat dateFormat = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
+        return dateFormat.format(new Date());
     }
 
 }


[rocketmq-connect] 04/10: add eventType

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d516617faf27d2b671ada1c557ae89a73d0cc808
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Tue May 31 10:39:54 2022 +0800

    add eventType
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  | 23 +++++++------
 .../eventbridge/sink/EventBridgeSinkConnector.java |  7 +++-
 .../eventbridge/sink/EventBridgeSinkTask.java      | 40 ++++++++++++++--------
 3 files changed, 43 insertions(+), 27 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index f22b5fe..8e5ab33 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -16,14 +16,14 @@ mvn clean install -Dmaven.test.skip=true
 ```
 http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
 ?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
-,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+,"stsEndpoint”:"${stsEndpoint}",accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",roleArn”:"${roleArn}", "roleSessionName":"${roleSessionName}", "eventTime":"${eventTime}","eventSubject":"${eventSubject}","eventType":"${eventType}","aliyuneventbusname":"${aliyuneventbusname}"}
 ```
 
 例子 
 ```
 http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
 "connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic","stsEndpoint”:"xxxx",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",
-roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+roleArn”:"xxxx", "roleSessionName":"xxxx", "eventTime":"xxxx","eventSubject":"", "eventType":"","aliyuneventbusname":"xxxx"}
 ```
 
 >**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
@@ -39,13 +39,14 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 
 |         KEY            |  TYPE   | Must be filled | Description                      | Example
 |------------------------|---------|----------------|----------------------------------|--|
-|stsEndpoint             | String  | YES            | STS endpoint                       | xxxx|
-|accessKeyId             | String  | YES            | 阿里云授信账号的AK                    | xxxx |
-|accessKeySecret         | String  | YES            | 阿里云授信账号的SK                     | xxx |
-|roleArn                 | String  | YES            | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
-|roleSessionName         | String  | YES            | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
-|eventTime               | String | YES             | 事件产生的时间                          | xxxx |
-|eventSubject            | String | NO              | 事件主题                          | xxxx |
-|aliyuneventbusname      | String | YES             | 接收事件的事件总线名称                          | xxxx |
-|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
+|stsEndpoint             | String  | YES             | STS endpoint                       | xxxx|
+|accessKeyId             | String  | YES             | 阿里云授信账号的AK                    | xxxx |
+|accessKeySecret         | String  | YES             | 阿里云授信账号的SK                     | xxx |
+|roleArn                 | String  | YES             | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
+|roleSessionName         | String  | YES             | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
+|eventTime               | String  | NO              | 事件产生的时间                          | xxxx |
+|eventSubject            | String  | YES             | 事件主题                          | xxxx |
+|eventType               | String  | YES             | 事件类型                          | xxxx |
+|aliyuneventbusname      | String  | YES             | 接收事件的事件总线名称                          | xxxx |
+|connect-topicname       | String  | YES             | sink需要处理数据消息topic                     | xxxx |
 
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 1a7a0b1..cc0d0a5 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -8,6 +8,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkConnector extends SinkConnector {
@@ -24,6 +25,8 @@ public class EventBridgeSinkConnector extends SinkConnector {
 
     private String eventSubject;
 
+    private String eventType;
+
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -53,6 +56,7 @@ public class EventBridgeSinkConnector extends SinkConnector {
         keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
         keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
         keyValue.put(EventBridgeConstant.STS_ENDPOINT, stsEndpoint);
+        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
         keyValueList.add(keyValue);
         return keyValueList;
     }
@@ -82,11 +86,12 @@ public class EventBridgeSinkConnector extends SinkConnector {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
     }
 
     @Override
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
index aaeac3a..c1ce188 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -16,6 +16,7 @@ import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
 import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
 import org.slf4j.Logger;
@@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 public class EventBridgeSinkTask extends SinkTask {
@@ -42,6 +44,8 @@ public class EventBridgeSinkTask extends SinkTask {
 
     private String eventSubject;
 
+    private String eventType;
+
     private String aliyuneventbusname;
 
     private String accountEndpoint;
@@ -55,7 +59,7 @@ public class EventBridgeSinkTask extends SinkTask {
             sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
                     .withId(connectRecord.getExtension(EventBridgeConstant.EVENT_ID))
                     .withSource(URI.create(connectRecord.getExtension(EventBridgeConstant.EVENT_SOURCE)))
-                    .withType(connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE))
+                    .withType(StringUtils.isBlank(eventType) ? connectRecord.getExtension(EventBridgeConstant.EVENT_TYPE) : eventType)
                     .withSubject(eventSubject)
                     .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
                     .withJsonStringData(connectRecord.getData().toString())
@@ -90,31 +94,37 @@ public class EventBridgeSinkTask extends SinkTask {
         accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
         roleArn = config.getString(EventBridgeConstant.ROLE_ARN);
         roleSessionName = config.getString(EventBridgeConstant.ROLE_SESSION_NAME);
-        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME, new Date().toString());
         eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
         aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
         accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
         stsEndpoint = config.getString(EventBridgeConstant.STS_ENDPOINT);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
     }
 
     @Override
     public void start(SinkTaskContext sinkTaskContext) {
         super.start(sinkTaskContext);
         try {
-            DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
-            DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
-            IAcsClient client = new DefaultAcsClient(profile);
-            AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
-            request.setRoleArn(roleArn);
-            request.setRoleSessionName(roleSessionName);
-            request.setAssumeRoleFor(roleSessionName);
-            request.setAcceptFormat(FormatType.JSON);
-            request.setDurationSeconds(3600L);
-            final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
             Config authConfig = new Config();
-            authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
-            authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
-            authConfig.securityToken = response.getCredentials().getSecurityToken();
+            if (StringUtils.isNotBlank(roleArn) && StringUtils.isNotBlank(roleSessionName)) {
+                DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
+                DefaultProfile profile = DefaultProfile.getProfile("", accessKeyId, accessKeySecret);
+                IAcsClient client = new DefaultAcsClient(profile);
+                AssumeRoleWithServiceIdentityRequest request = new AssumeRoleWithServiceIdentityRequest();
+                request.setRoleArn(roleArn);
+                request.setRoleSessionName(roleSessionName);
+                request.setAssumeRoleFor(roleSessionName);
+                request.setAcceptFormat(FormatType.JSON);
+                request.setDurationSeconds(3600L);
+                final AssumeRoleWithServiceIdentityResponse response = client.getAcsResponse(request);
+                authConfig.accessKeyId = response.getCredentials().getAccessKeyId();
+                authConfig.accessKeySecret = response.getCredentials().getAccessKeySecret();
+                authConfig.securityToken = response.getCredentials().getSecurityToken();
+            } else {
+                authConfig.accessKeyId = accessKeyId;
+                authConfig.accessKeySecret = accessKeySecret;
+            }
             authConfig.endpoint = accountEndpoint;
             eventBridgeClient = new EventBridgeClient(authConfig);
         } catch (Exception e) {


[rocketmq-connect] 08/10: update README.md

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 090bddc1c0ba50e7fb0608baeacef0c6768ac3ce
Author: zhaohai <zh...@126.com>
AuthorDate: Thu Jun 2 08:32:00 2022 +0800

    update README.md
---
 connectors/aliyun/rocketmq-connect-eventbridge/README.md           | 7 ++++---
 .../connect/eventbridge/sink/EventBridgeSinkConnector.java         | 5 ++---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
index 8e5ab33..6aec62f 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/README.md
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -39,11 +39,12 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector
 
 |         KEY            |  TYPE   | Must be filled | Description                      | Example
 |------------------------|---------|----------------|----------------------------------|--|
-|stsEndpoint             | String  | YES             | STS endpoint                       | xxxx|
+|stsEndpoint             | String  | NO              | STS endpoint                       | xxxx|
+|accountEndpoint         | String  | YES              | 阿里云授信账号 endpoint                       | xxxx|
 |accessKeyId             | String  | YES             | 阿里云授信账号的AK                    | xxxx |
 |accessKeySecret         | String  | YES             | 阿里云授信账号的SK                     | xxx |
-|roleArn                 | String  | YES             | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
-|roleSessionName         | String  | YES             | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
+|roleArn                 | String  | NO             | 要扮演的RAM角色ARN。 该角色是可信实体为阿里云账号类型的RAM角色                     | xxxx |
+|roleSessionName         | String  | NO             | 角色会话名称。 该参数为用户自定义参数。通常设置为调用该API的用户身份                  | 用户名 |
 |eventTime               | String  | NO              | 事件产生的时间                          | xxxx |
 |eventSubject            | String  | YES             | 事件主题                          | xxxx |
 |eventType               | String  | YES             | 事件类型                          | xxxx |
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 76899ce..254567a 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -71,10 +71,9 @@ public class EventBridgeSinkConnector extends SinkConnector {
         if (StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_ID))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_ARN))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.STS_ENDPOINT))) {
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SUBJECT))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))) {
             throw new RuntimeException("EventBridge required parameter is null !");
         }
     }


[rocketmq-connect] 07/10: update validate

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit c621ebb96bc961486d51fdb1949777ede732f4e7
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Tue May 31 14:02:36 2022 +0800

    update validate
---
 .../rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java      | 1 -
 1 file changed, 1 deletion(-)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
index 7a22997..76899ce 100644
--- a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -73,7 +73,6 @@ public class EventBridgeSinkConnector extends SinkConnector {
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_ARN))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ROLE_SESSION_NAME))
-                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))
                 || StringUtils.isBlank(config.getString(EventBridgeConstant.STS_ENDPOINT))) {
             throw new RuntimeException("EventBridge required parameter is null !");