You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/05/15 03:12:23 UTC

[rocketmq-eventbridge] branch runtimer updated: feat:support standard sink connector.

This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git


The following commit(s) were added to refs/heads/runtimer by this push:
     new dbc82d5  feat:support standard sink connector.
dbc82d5 is described below

commit dbc82d5113744436cb53c3f27ffc715af5f27835
Author: 2011shenlin <20...@gmail.com>
AuthorDate: Sat May 13 08:13:30 2023 +0800

    feat:support standard sink connector.
---
 .../impl/runtime/RuntimeTargetRunnerAPIImpl.java   |   1 +
 .../adapter/runtimer/common/entity/RunOptions.java |  29 ----
 .../runtimer/common/entity/TargetRunnerConfig.java |  14 +-
 .../adapter/runtimer/error/ErrorHandler.java       |   2 +-
 .../AbstractTargetRunnerConfigObserver.java        |  32 +++-
 .../service/TargetRunnerConfigOnDBObserver.java    |  21 +--
 .../service/TargetRunnerConfigOnFileObserver.java  |  28 ----
 .../eventbridge/EventBridgeFilterTransform.java    |   2 -
 supports/connect-standard/README.md                |   0
 supports/connect-standard/pom.xml                  | 177 +++++++++++++++++++++
 .../apache/rocketmq/connect/StandardSinkTask.java  |  55 +++++++
 11 files changed, 276 insertions(+), 85 deletions(-)

diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
index fed9b4b..ee66625 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
@@ -69,6 +69,7 @@ public class RuntimeTargetRunnerAPIImpl implements TargetRunnerAPI {
         components.add(filterComponent);
         components.add(transformComponent);
         components.add(targetComponent);
+        targetRunnerConfig.setRunOptions(runOptions);
         return targetRunnerConfig;
     }
 
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
deleted file mode 100644
index 4e44981..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
-
-import lombok.Data;
-
-@Data
-public class RunOptions {
-
-    private String errorsTolerance;
-
-    private String retryStrategy;
-
-}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index 20f3d5a..d97a147 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -22,6 +22,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import lombok.Data;
+import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
+import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
 
 import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY;
 
@@ -38,7 +42,7 @@ public class TargetRunnerConfig implements Serializable {
      */
     private List<Map<String, String>> components;
 
-    private RunOptions runOptions = new RunOptions();
+    private RunOptions runOptions = RunOptions.builder().errorsTolerance(ErrorToleranceEnum.ALL).retryStrategy(RetryStrategy.builder().pushRetryStrategy(PushRetryStrategyEnum.EXPONENTIAL_DECAY_RETRY).build()).build();
 
     @Override
     public boolean equals(Object o) {
@@ -58,10 +62,10 @@ public class TargetRunnerConfig implements Serializable {
     @Override
     public String toString() {
         return "TargetRunnerConfig{" +
-                "name='" + name + '\'' +
-                ", components=" + components +
-                ", runOptions=" + runOptions +
-                '}';
+            "name='" + name + '\'' +
+            ", components=" + components +
+            ", runOptions=" + runOptions +
+            '}';
     }
 
     private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) {
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
index 9aac1d5..8a59c35 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
@@ -45,7 +45,7 @@ public class ErrorHandler {
         String eventRunnerName = connectRecord.getExtension(RUNNER_NAME);
         TargetRunnerConfig targetRunnerConfig = TargetRunnerContext.getTargetRunnerConfig(eventRunnerName);
         String eventBusName = targetRunnerConfig.getEventBusName();
-        PushRetryStrategyEnum pushRetryStrategyEnum = PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy());
+        PushRetryStrategyEnum pushRetryStrategyEnum = targetRunnerConfig.getRunOptions().getRetryStrategy().getPushRetryStrategy();
 
         int retryTimes = parseRetryTimes(connectRecord);
         int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum);
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
index 5cdbe62..7157708 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -17,13 +17,12 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
 import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
@@ -88,4 +87,33 @@ public abstract class AbstractTargetRunnerConfigObserver implements TargetRunner
             listener.onDeleteTargetRunner(targetRunnerConfig);
         }
     }
+
+    protected void diff() {
+        Map<String, TargetRunnerConfig> lastMap = toMap(this.getTargetRunnerConfig());
+        Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig());
+        lastMap.entrySet().forEach(entry -> {
+            TargetRunnerConfig latest = latestMap.get(entry.getKey());
+            if (latest == null) {
+                this.onDeleteTargetRunner(entry.getValue());
+            } else if (!latest.equals(entry.getValue())) {
+                this.onUpdateTargetRunner(entry.getValue());
+            }
+        });
+
+        latestMap.entrySet().forEach(entry -> {
+            TargetRunnerConfig latest = lastMap.get(entry.getKey());
+            if (latest == null) {
+                this.onAddTargetRunner(entry.getValue());
+            }
+        });
+    }
+
+    protected Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) {
+        if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) {
+            return Maps.newHashMapWithExpectedSize(0);
+        }
+        Map<String, TargetRunnerConfig> map = Maps.newHashMap();
+        targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry));
+        return map;
+    }
 }
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
index 5e74f56..071ca48 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -17,11 +17,9 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,8 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.transaction.annotation.Transactional;
 
-import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY;
-
 @Slf4j
 @Component
 public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
@@ -66,26 +62,15 @@ public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigOb
         return targetRunnerConfigs;
     }
 
-    private Map<String, String> buildEventBusComponent(String eventBusName) {
-        Map<String, String> component = Maps.newHashMap();
-        component.put(TARGET_RUNNER_KEY, eventBusName);
-        return component;
-    }
-
     @PostConstruct
     public void addListen() {
         service.scheduleAtFixedRate(() -> {
             try {
-                Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
-                Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
-                TargetRunnerConfig changed = null;
-                super.onAddTargetRunner(changed);
-                super.onUpdateTargetRunner(changed);
-                super.onDeleteTargetRunner(changed);
+                super.diff();
             } catch (Throwable e) {
-                log.error("Watch failed.", e);
+                log.error("Watch file failed.", e);
             }
-        }, 0, 30, TimeUnit.SECONDS);
+        }, 0, 3, TimeUnit.SECONDS);
     }
 
 }
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index 791efc1..3948ff7 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -111,34 +111,6 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
         }, 0, 3, TimeUnit.SECONDS);
     }
 
-    public void diff() {
-        Map<String, TargetRunnerConfig> lastMap = toMap(super.getTargetRunnerConfig());
-        Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig());
-        lastMap.entrySet().forEach(entry -> {
-            TargetRunnerConfig latest = latestMap.get(entry.getKey());
-            if (latest == null) {
-                super.onDeleteTargetRunner(entry.getValue());
-            } else if (!latest.equals(entry.getValue())) {
-                super.onUpdateTargetRunner(entry.getValue());
-            }
-        });
-
-        latestMap.entrySet().forEach(entry -> {
-            TargetRunnerConfig latest = lastMap.get(entry.getKey());
-            if (latest == null) {
-                super.onAddTargetRunner(entry.getValue());
-            }
-        });
-    }
-
-    private Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) {
-        if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) {
-            return Maps.newHashMapWithExpectedSize(0);
-        }
-        Map<String, TargetRunnerConfig> map = Maps.newHashMap();
-        targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry));
-        return map;
-    }
 
     private String getConfigFilePath() {
         return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath();
diff --git a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
index 9f704b3..7105fa7 100644
--- a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
+++ b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
@@ -34,7 +34,6 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap
 
     @Override
     public ConnectRecord doTransform(ConnectRecord record) {
-        System.out.println("Start to filter transform:::::::::::::::::");
         if (!evaluator.evaluateData(new Gson().toJson(record.getData()))) {
             return null;
         } else if (!evaluator.evaluateSpecAttr(this.buildSpecAttr(record))) {
@@ -42,7 +41,6 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap
         } else if (!evaluator.evaluateExtensionAttr(this.buildExtensionAttr(record))) {
             return null;
         } else {
-            System.out.println("end to filter transform:::::::::::::::::");
             return record;
         }
     }
diff --git a/supports/connect-standard/README.md b/supports/connect-standard/README.md
new file mode 100644
index 0000000..e69de29
diff --git a/supports/connect-standard/pom.xml b/supports/connect-standard/pom.xml
new file mode 100644
index 0000000..90c8bb5
--- /dev/null
+++ b/supports/connect-standard/pom.xml
@@ -0,0 +1,177 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+	license agreements. See the NOTICE file distributed with this work for additional
+	information regarding copyright ownership. The ASF licenses this file to
+	You under the Apache License, Version 2.0 (the "License"); you may not use
+	this file except in compliance with the License. You may obtain a copy of
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+	by applicable law or agreed to in writing, software distributed under the
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+	OF ANY KIND, either express or implied. See the License for the specific
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>connect-standard</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <modelVersion>4.0.0</modelVersion>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <openmessaging-connector.version>0.1.2</openmessaging-connector.version>
+        <gson.version>2.8.9</gson.version>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>com.google.code.gson</groupId>
+                <artifactId>gson</artifactId>
+                <version>${gson.version}</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+    <dependencies>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>${openmessaging-connector.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>RELEASE</version>
+            <scope>test</scope>
+        </dependency>
+
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform
+                            </mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java b/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java
new file mode 100644
index 0000000..9714be2
--- /dev/null
+++ b/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect;
+
+import com.google.gson.Gson;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.List;
+
+public class StandardSinkTask extends SinkTask {
+
+    @Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+        if (sinkRecords == null || sinkRecords.isEmpty()) {
+            return;
+        }
+        sinkRecords.forEach(sinkRecord -> System.out.println(new Gson().toJson(sinkRecord)));
+    }
+
+    @Override public void pause() {
+
+    }
+
+    @Override public void resume() {
+
+    }
+
+    @Override public void validate(KeyValue config) {
+
+    }
+
+    @Override public void init(KeyValue config) {
+
+    }
+
+    @Override public void stop() {
+
+    }
+}
\ No newline at end of file