You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:18 UTC
[rocketmq-connect] 01/10: add eventbridge sink
This is an automated email from the ASF dual-hosted git repository.
yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 8b8c1d1476868db9d7e45f76e8e07402567194a0
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Sun Apr 24 16:50:34 2022 +0800
add eventbridge sink
---
.../aliyun/rocketmq-connect-eventbridge/README.md | 53 ++++++
.../aliyun/rocketmq-connect-eventbridge/pom.xml | 194 +++++++++++++++++++++
.../eventbridge/sink/EventBridgeSinkConnector.java | 96 ++++++++++
.../eventbridge/sink/EventBridgeSinkTask.java | 114 ++++++++++++
.../sink/constant/EventBridgeConstant.java | 14 ++
.../connect/eventbridge/sink/utils/DateUtils.java | 16 ++
.../eventbridge/sink/EventBridgeSinkTest.java | 87 +++++++++
7 files changed, 574 insertions(+)
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
new file mode 100644
index 0000000..d4eca92
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -0,0 +1,53 @@
+# rocketmq-connect-eventbridge
+* **rocketmq-connect-eventbridge** 说明
+```
+Be responsible for consuming messages from producer and writing data to eventbridge.
+```
+
+## rocketmq-connect-eventbridge 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-eventbridge 启动
+
+* **eventbridge-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
+,accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}","eventId":"${eventId}","eventSource":"${eventSource}","eventType":"${eventType}",
+"eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+```
+
+例子
+```
+http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",accountEndpoint”:"xxxx","eventId":"xxxx",
+"eventSource":"xxxx","eventType":"", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+```
+
+>**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-eventbridge 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector-name}/stop
+```
+
+## rocketmq-connect-eventbridge 参数说明
+* **eventbridge-sink-connector 参数说明**
+
+| KEY | TYPE | Must be filled | Description | Example
+|------------------------|---------|----------------|----------------------------------|--|
+|accessKeyId | String | YES | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxxx |
+|accessKeySecret | String | YES | 阿里云身份验证,在阿里云用户信息管理控制台获取 | xxx |
+|accountEndpoint | String | YES | 阿里云EventBridge官方接入点 | xxxx |
+|eventId | String | YES | 事件ID | xxxx |
+|eventSource | String | YES | 事件源 | xxxx |
+|eventType | String | YES | 事件类型 | null |
+|eventTime | String | YES | 事件产生的时间 | xxxx |
+|eventSubject | String | NO | 事件主题 | xxxx |
+|aliyuneventbusname | String | YES | 接收事件的事件总线名称 | xxxx |
+|connect-topicname | String | YES | sink需要处理数据消息topic | xxxx |
+
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
new file mode 100644
index 0000000..469d1fe
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-eventbridge</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <name>connect-eventbridge</name>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <slf4j.version>1.7.36</slf4j.version>
+ <logback.version>1.2.11</logback.version>
+ <junit.version>4.13.2</junit.version>
+ <assertj.version>3.22.0</assertj.version>
+ <mockito.version>4.4.0</mockito.version>
+ <openmessaging-connector.version>0.1.2</openmessaging-connector.version>
+ <commons-lang3.version>3.12.0</commons-lang3.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <eventbridge-client.version>1.3.5</eventbridge-client.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>${logback.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>${openmessaging-connector.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun</groupId>
+ <artifactId>eventbridge-client</artifactId>
+ <version>${eventbridge-client.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
new file mode 100644
index 0000000..f986a0a
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -0,0 +1,96 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventBridgeSinkConnector extends SinkConnector {
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String accountEndpoint;
+
+ private String eventId;
+
+ private String eventSource;
+
+ private String eventTime;
+
+ private String eventType;
+
+ private String eventSubject;
+
+ private String aliyuneventbusname;
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<KeyValue> keyValueList = new ArrayList<>(11);
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
+ keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
+ keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
+ keyValue.put(EventBridgeConstant.EVENT_ID, eventId);
+ keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
+ keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
+ keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
+ keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
+ keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
+ keyValueList.add(keyValue);
+ return keyValueList;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return EventBridgeSinkTask.class;
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+ if (StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_ID))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_ID))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
+ || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))) {
+ throw new RuntimeException("EventBridge required parameter is null !");
+ }
+ }
+
+ @Override
+ public void init(KeyValue config) {
+ accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
+ accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
+ accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+ eventId = config.getString(EventBridgeConstant.EVENT_ID);
+ eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+ eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+ eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+ eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
+ aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+ }
+
+ @Override
+ public void stop() {
+
+ }
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
new file mode 100644
index 0000000..3af4564
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -0,0 +1,114 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import com.aliyun.eventbridge.EventBridgeClient;
+import com.aliyun.eventbridge.models.CloudEvent;
+import com.aliyun.eventbridge.models.Config;
+import com.aliyun.eventbridge.models.PutEventsResponse;
+import com.aliyun.eventbridge.util.EventBuilder;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventBridgeSinkTask extends SinkTask {
+ private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String accountEndpoint;
+
+ private String eventId;
+
+ private String eventSource;
+
+ private String eventTime;
+
+ private String eventType;
+
+ private String eventSubject;
+
+ private String aliyuneventbusname;
+
+ private EventBridgeClient eventBridgeClient;
+
+ @Override
+ public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+ List<CloudEvent> cloudEventList = new ArrayList<>();
+ try {
+ sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
+ .withId(eventId)
+ .withSource(URI.create(eventSource))
+ .withType(eventType)
+ .withSubject(eventSubject)
+ .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
+ .withJsonStringData(connectRecord.getData().toString())
+ .withAliyunEventBus(aliyuneventbusname)
+ .build()));
+ PutEventsResponse putEventsResponse = eventBridgeClient.putEvents(cloudEventList);
+ log.info("EventBridgeSinkTask | put | putEventsResponse | entryList : {} | requestId : {}", putEventsResponse.getEntryList(), putEventsResponse.getRequestId());
+ } catch (Exception e) {
+ log.error("EventBridgeSinkTask | put | error => ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public void validate(KeyValue config) {
+
+ }
+
+ @Override
+ public void init(KeyValue config) {
+ accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
+ accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
+ accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+ eventId = config.getString(EventBridgeConstant.EVENT_ID);
+ eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+ eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+ eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+ eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
+ aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+ }
+
+ @Override
+ public void start(SinkTaskContext sinkTaskContext) {
+ super.start(sinkTaskContext);
+ try {
+ Config authConfig = new Config();
+ authConfig.accessKeyId = accessKeyId;
+ authConfig.accessKeySecret = accessKeySecret;
+ authConfig.endpoint = accountEndpoint;
+ eventBridgeClient = new EventBridgeClient(authConfig);
+ } catch (Exception e) {
+ log.error("EventBridgeSinkTask | start | error => ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ eventBridgeClient = null;
+ }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
new file mode 100644
index 0000000..51a7508
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -0,0 +1,14 @@
+package org.apache.rocketmq.connect.eventbridge.sink.constant;
+
+public class EventBridgeConstant {
+
+ public static final String ACCESS_KEY_ID = "accessKeyId";
+ public static final String ACCESS_KEY_SECRET = "accessKeySecret";
+ public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
+ public static final String EVENT_ID = "eventId";
+ public static final String EVENT_SOURCE = "eventSource";
+ public static final String EVENT_TIME = "eventTime";
+ public static final String EVENT_TYPE = "eventType";
+ public static final String EVENT_SUBJECT = "eventSubject";
+ public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
new file mode 100644
index 0000000..c361653
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
@@ -0,0 +1,16 @@
+package org.apache.rocketmq.connect.eventbridge.sink.utils;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+public class DateUtils {
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+ public static Date getDate(String date, String dateFormat) {
+ final LocalDateTime parse = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(dateFormat));
+ return Date.from(parse.atZone(ZoneId.systemDefault()).toInstant());
+ }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
new file mode 100644
index 0000000..030794c
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -0,0 +1,87 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+
+public class EventBridgeSinkTest {
+
+ @Test
+ public void testTaskConfigs() {
+ EventBridgeSinkConnector eventBridgeSinkConnector = new EventBridgeSinkConnector();
+ Assert.assertEquals(eventBridgeSinkConnector.taskConfigs(1).size(), 1);
+ }
+
+ @Test
+ public void testPut() {
+ EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
+ keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
+ keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
+ keyValue.put(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
+ keyValue.put(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+ keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
+ keyValue.put(EventBridgeConstant.EVENT_TYPE, "xxxx");
+ keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
+ keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
+ eventBridgeSinkTask.init(keyValue);
+ List<ConnectRecord> connectRecordList = new ArrayList<>();
+ ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
+ connectRecord.setData("{\n" +
+ "\t\"test\" : \"test\"\n" +
+ "}");
+ connectRecordList.add(connectRecord);
+ eventBridgeSinkTask.start(new SinkTaskContext() {
+ @Override
+ public String getConnectorName() {
+ return null;
+ }
+
+ @Override
+ public String getTaskName() {
+ return null;
+ }
+
+ @Override
+ public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+
+ }
+
+ @Override
+ public void resetOffset(Map<RecordPartition, RecordOffset> map) {
+
+ }
+
+ @Override
+ public void pause(List<RecordPartition> list) {
+
+ }
+
+ @Override
+ public void resume(List<RecordPartition> list) {
+
+ }
+
+ @Override
+ public Set<RecordPartition> assignment() {
+ return null;
+ }
+ });
+ eventBridgeSinkTask.put(connectRecordList);
+ }
+
+}