You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/06/02 08:58:18 UTC

[rocketmq-connect] 01/10: add eventbridge sink

This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 8b8c1d1476868db9d7e45f76e8e07402567194a0
Author: zh378814 <wb...@alibaba-inc.com>
AuthorDate: Sun Apr 24 16:50:34 2022 +0800

    add eventbridge sink
---
 .../aliyun/rocketmq-connect-eventbridge/README.md  |  53 ++++++
 .../aliyun/rocketmq-connect-eventbridge/pom.xml    | 194 +++++++++++++++++++++
 .../eventbridge/sink/EventBridgeSinkConnector.java |  96 ++++++++++
 .../eventbridge/sink/EventBridgeSinkTask.java      | 114 ++++++++++++
 .../sink/constant/EventBridgeConstant.java         |  14 ++
 .../connect/eventbridge/sink/utils/DateUtils.java  |  16 ++
 .../eventbridge/sink/EventBridgeSinkTest.java      |  87 +++++++++
 7 files changed, 574 insertions(+)

diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/README.md b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
new file mode 100644
index 0000000..d4eca92
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/README.md
@@ -0,0 +1,53 @@
+# rocketmq-connect-eventbridge
+* **rocketmq-connect-eventbridge** 说明
+```
+Be responsible for consuming messages from producer and writing data to eventbridge.
+```
+
+## rocketmq-connect-eventbridge 打包
+```
+mvn clean install -Dmaven.test.skip=true
+```
+
+## rocketmq-connect-eventbridge 启动
+
+* **eventbridge-sink-connector** 启动
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-sink-connector-name}
+?config={"source-rocketmq":"${runtime-ip}:${runtime-port}","source-cluster":"${broker-cluster}","connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"${connect-topicname}"
+,accessKeyId”:"${accessKeyId}",accessKeySecret”:"${accessKeySecret}",accountEndpoint”:"${accountEndpoint}","eventId":"${eventId}","eventSource":"${eventSource}","eventType":"${eventType}", 
+"eventTime":"${eventTime}","eventSubject":"${eventSubject}","aliyuneventbusname":"${aliyuneventbusname}"}
+```
+
+例子 
+```
+http://localhost:8081/connectors/eventbridgeConnectorSink?config={"source-rocketmq":"localhost:9876","source-cluster":"DefaultCluster",
+"connector-class":"org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector",“connect-topicname”:"eventbridge-topic",accessKeyId”:"xxxx",accessKeySecret”:"xxxx",accountEndpoint”:"xxxx","eventId":"xxxx",
+"eventSource":"xxxx","eventType":"", "eventTime":"xxxx","eventSubject":"", "aliyuneventbusname":"xxxx"}
+```
+
+>**注:** `rocketmq-eventbridge-connect` 的启动依赖于`rocketmq-connect-runtime`项目的启动,需将打好的所有`jar`包放置到`runtime`项目中`pluginPaths`配置的路径后再执行上面的启动请求,该值配置在`runtime`项目下的`connect.conf`文件中
+
+## rocketmq-connect-eventbridge 停止
+
+```
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-eventbridge-connector-name}/stop
+```
+
+## rocketmq-connect-eventbridge 参数说明
+* **eventbridge-sink-connector 参数说明**
+
+|         KEY            |  TYPE   | Must be filled | Description                      | Example
+|------------------------|---------|----------------|----------------------------------|--|
+|accessKeyId             | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                    | xxxx |
+|accessKeySecret         | String  | YES            | 阿里云身份验证,在阿里云用户信息管理控制台获取                     | xxx |
+|accountEndpoint         | String  | YES            | 阿里云EventBridge官方接入点                     | xxxx |
+|eventId                 | String  | YES            | 事件ID | xxxx |
+|eventSource             | String  | YES            | 事件源 | xxxx |
+|eventType               | String | YES             | 事件类型                           | null |
+|eventTime               | String | YES             | 事件产生的时间                          | xxxx |
+|eventSubject            | String | NO              | 事件主题                          | xxxx |
+|aliyuneventbusname      | String | YES             | 接收事件的事件总线名称                          | xxxx |
+|connect-topicname       | String  | YES            | sink需要处理数据消息topic                     | xxxx |
+
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
new file mode 100644
index 0000000..469d1fe
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/pom.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-eventbridge</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>connect-eventbridge</name>
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <slf4j.version>1.7.36</slf4j.version>
+        <logback.version>1.2.11</logback.version>
+        <junit.version>4.13.2</junit.version>
+        <assertj.version>3.22.0</assertj.version>
+        <mockito.version>4.4.0</mockito.version>
+        <openmessaging-connector.version>0.1.2</openmessaging-connector.version>
+        <commons-lang3.version>3.12.0</commons-lang3.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <eventbridge-client.version>1.3.5</eventbridge-client.version>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.eventbridge.sink.EventBridgeSinkConnector</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>${assertj.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>eventbridge-client</artifactId>
+            <version>${eventbridge-client.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
new file mode 100644
index 0000000..f986a0a
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkConnector.java
@@ -0,0 +1,96 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventBridgeSinkConnector extends SinkConnector {
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String eventId;
+
+    private String eventSource;
+
+    private String eventTime;
+
+    private String eventType;
+
+    private String eventSubject;
+
+    private String aliyuneventbusname;
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<KeyValue> keyValueList = new ArrayList<>(11);
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, accessKeyId);
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, accessKeySecret);
+        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, accountEndpoint);
+        keyValue.put(EventBridgeConstant.EVENT_ID, eventId);
+        keyValue.put(EventBridgeConstant.EVENT_SOURCE, eventSource);
+        keyValue.put(EventBridgeConstant.EVENT_TIME, eventTime);
+        keyValue.put(EventBridgeConstant.EVENT_TYPE, eventType);
+        keyValue.put(EventBridgeConstant.EVENT_SUBJECT, eventSubject);
+        keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, aliyuneventbusname);
+        keyValueList.add(keyValue);
+        return keyValueList;
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return EventBridgeSinkTask.class;
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+        if (StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_ID))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCESS_KEY_SECRET))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_ID))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_SOURCE))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TIME))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.EVENT_TYPE))
+                || StringUtils.isBlank(config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME))) {
+            throw new RuntimeException("EventBridge required parameter is null !");
+        }
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        eventId = config.getString(EventBridgeConstant.EVENT_ID);
+        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+        eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
+        aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+    }
+
+    @Override
+    public void stop() {
+
+    }
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
new file mode 100644
index 0000000..3af4564
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTask.java
@@ -0,0 +1,114 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import com.aliyun.eventbridge.EventBridgeClient;
+import com.aliyun.eventbridge.models.CloudEvent;
+import com.aliyun.eventbridge.models.Config;
+import com.aliyun.eventbridge.models.PutEventsResponse;
+import com.aliyun.eventbridge.util.EventBuilder;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.apache.rocketmq.connect.eventbridge.sink.utils.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class EventBridgeSinkTask extends SinkTask {
+    private static final Logger log = LoggerFactory.getLogger(EventBridgeSinkTask.class);
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String accountEndpoint;
+
+    private String eventId;
+
+    private String eventSource;
+
+    private String eventTime;
+
+    private String eventType;
+
+    private String eventSubject;
+
+    private String aliyuneventbusname;
+
+    private EventBridgeClient eventBridgeClient;
+
+    @Override
+    public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        List<CloudEvent> cloudEventList = new ArrayList<>();
+        try {
+            sinkRecords.forEach(connectRecord -> cloudEventList.add(EventBuilder.builder()
+                    .withId(eventId)
+                    .withSource(URI.create(eventSource))
+                    .withType(eventType)
+                    .withSubject(eventSubject)
+                    .withTime(DateUtils.getDate(eventTime, DateUtils.DEFAULT_DATE_FORMAT))
+                    .withJsonStringData(connectRecord.getData().toString())
+                    .withAliyunEventBus(aliyuneventbusname)
+                    .build()));
+            PutEventsResponse putEventsResponse = eventBridgeClient.putEvents(cloudEventList);
+            log.info("EventBridgeSinkTask | put | putEventsResponse | entryList : {} | requestId : {}", putEventsResponse.getEntryList(), putEventsResponse.getRequestId());
+        } catch (Exception e) {
+            log.error("EventBridgeSinkTask | put | error => ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public void validate(KeyValue config) {
+
+    }
+
+    @Override
+    public void init(KeyValue config) {
+        accessKeyId = config.getString(EventBridgeConstant.ACCESS_KEY_ID);
+        accessKeySecret = config.getString(EventBridgeConstant.ACCESS_KEY_SECRET);
+        accountEndpoint = config.getString(EventBridgeConstant.ACCOUNT_ENDPOINT);
+        eventId = config.getString(EventBridgeConstant.EVENT_ID);
+        eventSource = config.getString(EventBridgeConstant.EVENT_SOURCE);
+        eventTime = config.getString(EventBridgeConstant.EVENT_TIME);
+        eventType = config.getString(EventBridgeConstant.EVENT_TYPE);
+        eventSubject = config.getString(EventBridgeConstant.EVENT_SUBJECT);
+        aliyuneventbusname = config.getString(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME);
+    }
+
+    @Override
+    public void start(SinkTaskContext sinkTaskContext) {
+        super.start(sinkTaskContext);
+        try {
+            Config authConfig = new Config();
+            authConfig.accessKeyId = accessKeyId;
+            authConfig.accessKeySecret = accessKeySecret;
+            authConfig.endpoint = accountEndpoint;
+            eventBridgeClient = new EventBridgeClient(authConfig);
+        } catch (Exception e) {
+            log.error("EventBridgeSinkTask | start | error => ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        eventBridgeClient = null;
+    }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
new file mode 100644
index 0000000..51a7508
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/constant/EventBridgeConstant.java
@@ -0,0 +1,14 @@
+package org.apache.rocketmq.connect.eventbridge.sink.constant;
+
+public class EventBridgeConstant {
+
+    public static final String ACCESS_KEY_ID = "accessKeyId";
+    public static final String ACCESS_KEY_SECRET = "accessKeySecret";
+    public static final String ACCOUNT_ENDPOINT = "accountEndpoint";
+    public static final String EVENT_ID = "eventId";
+    public static final String EVENT_SOURCE = "eventSource";
+    public static final String EVENT_TIME = "eventTime";
+    public static final String EVENT_TYPE = "eventType";
+    public static final String EVENT_SUBJECT = "eventSubject";
+    public static final String ALIYUN_EVENT_BUS_NAME = "aliyuneventbusname";
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
new file mode 100644
index 0000000..c361653
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/main/java/org/apache/rocketmq/connect/eventbridge/sink/utils/DateUtils.java
@@ -0,0 +1,16 @@
+package org.apache.rocketmq.connect.eventbridge.sink.utils;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+
+public class DateUtils {
+    public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
+
+    public static Date getDate(String date, String dateFormat) {
+        final LocalDateTime parse = LocalDateTime.parse(date, DateTimeFormatter.ofPattern(dateFormat));
+        return Date.from(parse.atZone(ZoneId.systemDefault()).toInstant());
+    }
+
+}
diff --git a/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
new file mode 100644
index 0000000..030794c
--- /dev/null
+++ b/connectors/aliyun/rocketmq-connect-eventbridge/src/test/java/org/apache/rocketmq/connect/eventbridge/sink/EventBridgeSinkTest.java
@@ -0,0 +1,87 @@
+package org.apache.rocketmq.connect.eventbridge.sink;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.eventbridge.sink.constant.EventBridgeConstant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+
+public class EventBridgeSinkTest {
+
+    @Test
+    public void testTaskConfigs() {
+        EventBridgeSinkConnector eventBridgeSinkConnector = new EventBridgeSinkConnector();
+        Assert.assertEquals(eventBridgeSinkConnector.taskConfigs(1).size(), 1);
+    }
+
+    @Test
+    public void testPut() {
+        EventBridgeSinkTask eventBridgeSinkTask = new EventBridgeSinkTask();
+        KeyValue keyValue = new DefaultKeyValue();
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_ID, "xxxx");
+        keyValue.put(EventBridgeConstant.ACCESS_KEY_SECRET, "xxxx");
+        keyValue.put(EventBridgeConstant.ACCOUNT_ENDPOINT, "xxxx");
+        keyValue.put(EventBridgeConstant.EVENT_ID, UUID.randomUUID().toString());
+        keyValue.put(EventBridgeConstant.EVENT_SOURCE, "xxxx");
+        keyValue.put(EventBridgeConstant.EVENT_TIME, "2022-04-24 16:12:00");
+        keyValue.put(EventBridgeConstant.EVENT_TYPE, "xxxx");
+        keyValue.put(EventBridgeConstant.EVENT_SUBJECT, "xxxx");
+        keyValue.put(EventBridgeConstant.ALIYUN_EVENT_BUS_NAME, "xxxx");
+        eventBridgeSinkTask.init(keyValue);
+        List<ConnectRecord> connectRecordList = new ArrayList<>();
+        ConnectRecord connectRecord = new ConnectRecord(null, null, System.currentTimeMillis());
+        connectRecord.setData("{\n" +
+                "\t\"test\" :  \"test\"\n" +
+                "}");
+        connectRecordList.add(connectRecord);
+        eventBridgeSinkTask.start(new SinkTaskContext() {
+            @Override
+            public String getConnectorName() {
+                return null;
+            }
+
+            @Override
+            public String getTaskName() {
+                return null;
+            }
+
+            @Override
+            public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
+
+            }
+
+            @Override
+            public void resetOffset(Map<RecordPartition, RecordOffset> map) {
+
+            }
+
+            @Override
+            public void pause(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public void resume(List<RecordPartition> list) {
+
+            }
+
+            @Override
+            public Set<RecordPartition> assignment() {
+                return null;
+            }
+        });
+        eventBridgeSinkTask.put(connectRecordList);
+    }
+
+}