You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by sh...@apache.org on 2023/05/15 03:12:23 UTC
[rocketmq-eventbridge] branch runtimer updated: feat:support standard sink connector.
This is an automated email from the ASF dual-hosted git repository.
shenlin pushed a commit to branch runtimer
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
The following commit(s) were added to refs/heads/runtimer by this push:
new dbc82d5 feat:support standard sink connector.
dbc82d5 is described below
commit dbc82d5113744436cb53c3f27ffc715af5f27835
Author: 2011shenlin <20...@gmail.com>
AuthorDate: Sat May 13 08:13:30 2023 +0800
feat:support standard sink connector.
---
.../impl/runtime/RuntimeTargetRunnerAPIImpl.java | 1 +
.../adapter/runtimer/common/entity/RunOptions.java | 29 ----
.../runtimer/common/entity/TargetRunnerConfig.java | 14 +-
.../adapter/runtimer/error/ErrorHandler.java | 2 +-
.../AbstractTargetRunnerConfigObserver.java | 32 +++-
.../service/TargetRunnerConfigOnDBObserver.java | 21 +--
.../service/TargetRunnerConfigOnFileObserver.java | 28 ----
.../eventbridge/EventBridgeFilterTransform.java | 2 -
supports/connect-standard/README.md | 0
supports/connect-standard/pom.xml | 177 +++++++++++++++++++++
.../apache/rocketmq/connect/StandardSinkTask.java | 55 +++++++
11 files changed, 276 insertions(+), 85 deletions(-)
diff --git a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
index fed9b4b..ee66625 100644
--- a/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
+++ b/adapter/rpc/src/main/java/org/apache/rocketmq/eventbridge/adapter/rpc/impl/runtime/RuntimeTargetRunnerAPIImpl.java
@@ -69,6 +69,7 @@ public class RuntimeTargetRunnerAPIImpl implements TargetRunnerAPI {
components.add(filterComponent);
components.add(transformComponent);
components.add(targetComponent);
+ targetRunnerConfig.setRunOptions(runOptions);
return targetRunnerConfig;
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
deleted file mode 100644
index 4e44981..0000000
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/RunOptions.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity;
-
-import lombok.Data;
-
-@Data
-public class RunOptions {
-
- private String errorsTolerance;
-
- private String retryStrategy;
-
-}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
index 20f3d5a..d97a147 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/common/entity/TargetRunnerConfig.java
@@ -22,6 +22,10 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.Data;
+import org.apache.rocketmq.eventbridge.domain.model.run.RetryStrategy;
+import org.apache.rocketmq.eventbridge.domain.model.run.RunOptions;
+import org.apache.rocketmq.eventbridge.enums.ErrorToleranceEnum;
+import org.apache.rocketmq.eventbridge.enums.PushRetryStrategyEnum;
import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY;
@@ -38,7 +42,7 @@ public class TargetRunnerConfig implements Serializable {
*/
private List<Map<String, String>> components;
- private RunOptions runOptions = new RunOptions();
+ private RunOptions runOptions = RunOptions.builder().errorsTolerance(ErrorToleranceEnum.ALL).retryStrategy(RetryStrategy.builder().pushRetryStrategy(PushRetryStrategyEnum.EXPONENTIAL_DECAY_RETRY).build()).build();
@Override
public boolean equals(Object o) {
@@ -58,10 +62,10 @@ public class TargetRunnerConfig implements Serializable {
@Override
public String toString() {
return "TargetRunnerConfig{" +
- "name='" + name + '\'' +
- ", components=" + components +
- ", runOptions=" + runOptions +
- '}';
+ "name='" + name + '\'' +
+ ", components=" + components +
+ ", runOptions=" + runOptions +
+ '}';
}
private boolean isEqualsComponents(List<Map<String, String>> source, List<Map<String, String>> target) {
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
index 9aac1d5..8a59c35 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/error/ErrorHandler.java
@@ -45,7 +45,7 @@ public class ErrorHandler {
String eventRunnerName = connectRecord.getExtension(RUNNER_NAME);
TargetRunnerConfig targetRunnerConfig = TargetRunnerContext.getTargetRunnerConfig(eventRunnerName);
String eventBusName = targetRunnerConfig.getEventBusName();
- PushRetryStrategyEnum pushRetryStrategyEnum = PushRetryStrategyEnum.parse(targetRunnerConfig.getRunOptions().getRetryStrategy());
+ PushRetryStrategyEnum pushRetryStrategyEnum = targetRunnerConfig.getRunOptions().getRetryStrategy().getPushRetryStrategy();
int retryTimes = parseRetryTimes(connectRecord);
int delaySec = calcDelaySec(retryTimes, pushRetryStrategyEnum);
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
index 5cdbe62..7157708 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/AbstractTargetRunnerConfigObserver.java
@@ -17,13 +17,12 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.TargetRunnerListener;
-import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetKeyValue;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.entity.TargetRunnerConfig;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
@@ -88,4 +87,33 @@ public abstract class AbstractTargetRunnerConfigObserver implements TargetRunner
listener.onDeleteTargetRunner(targetRunnerConfig);
}
}
+
+ protected void diff() {
+ Map<String, TargetRunnerConfig> lastMap = toMap(this.getTargetRunnerConfig());
+ Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig());
+ lastMap.entrySet().forEach(entry -> {
+ TargetRunnerConfig latest = latestMap.get(entry.getKey());
+ if (latest == null) {
+ this.onDeleteTargetRunner(entry.getValue());
+ } else if (!latest.equals(entry.getValue())) {
+ this.onUpdateTargetRunner(entry.getValue());
+ }
+ });
+
+ latestMap.entrySet().forEach(entry -> {
+ TargetRunnerConfig latest = lastMap.get(entry.getKey());
+ if (latest == null) {
+ this.onAddTargetRunner(entry.getValue());
+ }
+ });
+ }
+
+ protected Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) {
+ if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) {
+ return Maps.newHashMapWithExpectedSize(0);
+ }
+ Map<String, TargetRunnerConfig> map = Maps.newHashMap();
+ targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry));
+ return map;
+ }
}
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
index 5e74f56..071ca48 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnDBObserver.java
@@ -17,11 +17,9 @@
package org.apache.rocketmq.eventbridge.adapter.runtimer.service;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -37,8 +35,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
-import static org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine.TARGET_RUNNER_KEY;
-
@Slf4j
@Component
public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigObserver {
@@ -66,26 +62,15 @@ public class TargetRunnerConfigOnDBObserver extends AbstractTargetRunnerConfigOb
return targetRunnerConfigs;
}
- private Map<String, String> buildEventBusComponent(String eventBusName) {
- Map<String, String> component = Maps.newHashMap();
- component.put(TARGET_RUNNER_KEY, eventBusName);
- return component;
- }
-
@PostConstruct
public void addListen() {
service.scheduleAtFixedRate(() -> {
try {
- Set<TargetRunnerConfig> latest = this.getLatestTargetRunnerConfig();
- Set<TargetRunnerConfig> last = super.getTargetRunnerConfig();
- TargetRunnerConfig changed = null;
- super.onAddTargetRunner(changed);
- super.onUpdateTargetRunner(changed);
- super.onDeleteTargetRunner(changed);
+ super.diff();
} catch (Throwable e) {
- log.error("Watch failed.", e);
+ log.error("Watch file failed.", e);
}
- }, 0, 30, TimeUnit.SECONDS);
+ }, 0, 3, TimeUnit.SECONDS);
}
}
\ No newline at end of file
diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
index 791efc1..3948ff7 100644
--- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
+++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java
@@ -111,34 +111,6 @@ public class TargetRunnerConfigOnFileObserver extends AbstractTargetRunnerConfig
}, 0, 3, TimeUnit.SECONDS);
}
- public void diff() {
- Map<String, TargetRunnerConfig> lastMap = toMap(super.getTargetRunnerConfig());
- Map<String, TargetRunnerConfig> latestMap = toMap(this.getLatestTargetRunnerConfig());
- lastMap.entrySet().forEach(entry -> {
- TargetRunnerConfig latest = latestMap.get(entry.getKey());
- if (latest == null) {
- super.onDeleteTargetRunner(entry.getValue());
- } else if (!latest.equals(entry.getValue())) {
- super.onUpdateTargetRunner(entry.getValue());
- }
- });
-
- latestMap.entrySet().forEach(entry -> {
- TargetRunnerConfig latest = lastMap.get(entry.getKey());
- if (latest == null) {
- super.onAddTargetRunner(entry.getValue());
- }
- });
- }
-
- private Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunnerConfigs) {
- if (targetRunnerConfigs == null || targetRunnerConfigs.isEmpty()) {
- return Maps.newHashMapWithExpectedSize(0);
- }
- Map<String, TargetRunnerConfig> map = Maps.newHashMap();
- targetRunnerConfigs.forEach(entry -> map.put(entry.getName(), entry));
- return map;
- }
private String getConfigFilePath() {
return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath();
diff --git a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
index 9f704b3..7105fa7 100644
--- a/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
+++ b/supports/connect-filter-transform/src/main/java/org/apache/rocketmq/connect/transform/eventbridge/EventBridgeFilterTransform.java
@@ -34,7 +34,6 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap
@Override
public ConnectRecord doTransform(ConnectRecord record) {
- System.out.println("Start to filter transform:::::::::::::::::");
if (!evaluator.evaluateData(new Gson().toJson(record.getData()))) {
return null;
} else if (!evaluator.evaluateSpecAttr(this.buildSpecAttr(record))) {
@@ -42,7 +41,6 @@ public class EventBridgeFilterTransform implements io.openmessaging.connector.ap
} else if (!evaluator.evaluateExtensionAttr(this.buildExtensionAttr(record))) {
return null;
} else {
- System.out.println("end to filter transform:::::::::::::::::");
return record;
}
}
diff --git a/supports/connect-standard/README.md b/supports/connect-standard/README.md
new file mode 100644
index 0000000..e69de29
diff --git a/supports/connect-standard/pom.xml b/supports/connect-standard/pom.xml
new file mode 100644
index 0000000..90c8bb5
--- /dev/null
+++ b/supports/connect-standard/pom.xml
@@ -0,0 +1,177 @@
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>connect-standard</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <openmessaging-connector.version>0.1.2</openmessaging-connector.version>
+ <gson.version>2.8.9</gson.version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>${openmessaging-connector.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>RELEASE</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform
+ </mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java b/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java
new file mode 100644
index 0000000..9714be2
--- /dev/null
+++ b/supports/connect-standard/src/main/java/org/apache/rocketmq/connect/StandardSinkTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect;
+
+import com.google.gson.Gson;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import java.util.List;
+
+public class StandardSinkTask extends SinkTask {
+
+ @Override public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
+ if (sinkRecords == null || sinkRecords.isEmpty()) {
+ return;
+ }
+ sinkRecords.forEach(sinkRecord -> System.out.println(new Gson().toJson(sinkRecord)));
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+
+ @Override public void validate(KeyValue config) {
+
+ }
+
+ @Override public void init(KeyValue config) {
+
+ }
+
+ @Override public void stop() {
+
+ }
+}
\ No newline at end of file