You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:21:38 UTC
[rocketmq-connect] 02/07: rocketmq-connect-kafka
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit bbb12023c30a11d1b6190d0e00fdb5c6422d6f05
Author: jonnxu <jo...@163.com>
AuthorDate: Fri Jul 5 01:39:45 2019 +0800
rocketmq-connect-kafka
---
pom.xml | 210 ++++++++++++++++++
.../org/apache/rocketmq/connect/kafka/Config.java | 116 ++++++++++
.../kafka/connector/KafkaSourceConnector.java | 103 +++++++++
.../connect/kafka/connector/KafkaSourceTask.java | 247 +++++++++++++++++++++
src/main/resources/connect-kafka-source.properties | 23 ++
.../kafka/connector/KafkaSourceConnectorTest.java | 56 +++++
.../kafka/connector/KafkaSourceTaskTest.java | 43 ++++
7 files changed, 798 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..08712ca
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,210 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-kafka</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>rocketmq-connect-kafka</name>
+ <packaging>pom</packaging>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+
+ <scm>
+ <url>https://github.com/openmessaging/openmessaging-connector</url>
+ <connection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</connection>
+ <developerConnection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</developerConnection>
+ <tag>HEAD</tag>
+ </scm>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.11.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <!--
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.6.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.5</version>
+ <scope>test</scope>
+ </dependency>
+ -->
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connect-runtime</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.0-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>1.0.0-alpha</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>4.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.51</version>
+ </dependency>
+ <dependency>
+ <groupId>io.javalin</groupId>
+ <artifactId>javalin</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.7</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
new file mode 100644
index 0000000..869597e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.*;
+
+public class Config {
+
+ public static String TASK_NUM = "tasks.num";
+ public static String TOPICS = "kafka.topics";
+ public static String GROUP_ID = "kafka.group.id";
+ public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
+ public static String ROCKETMQ_TOPIC = "rocketmq.topic";
+
+ private String bootstrapServers;
+ private String topics;
+ private String groupId;
+
+ public String getTopics() {
+ return topics;
+ }
+
+ public void setTopics(String topics) {
+ this.topics = topics;
+ }
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+ {
+ add(TOPICS);
+ add(GROUP_ID);
+ add(BOOTSTRAP_SERVER);
+ }
+ };
+
+ public void load(KeyValue props) {
+ properties2Object(props, this);
+ }
+
+ private void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+
+ public static Set<String> getRequestConfig() {
+ return REQUEST_CONFIG;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
new file mode 100644
index 0000000..ba30901
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connect.runtime.common.ConnectKeyValue;
+import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaSourceConnector extends SourceConnector{
+ private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class);
+
+ private KeyValue connectConfig;
+
+ public KafkaSourceConnector() {
+ super();
+ }
+
+ @Override
+ public String verifyAndSetConfig(KeyValue config) {
+
+ log.info("KafkaSourceConnector verifyAndSetConfig enter");
+ for ( String key : config.keySet()) {
+ log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
+ }
+
+ for(String requestKey : Config.REQUEST_CONFIG){
+ if(!config.containsKey(requestKey)){
+ return "Request Config key: " + requestKey;
+ }
+ }
+ this.connectConfig = config;
+ return "";
+ }
+
+ @Override
+ public void start() {
+ log.info("KafkaSourceConnector start enter");
+ }
+
+ @Override
+ public void stop() {
+ log.info("KafkaSourceConnector stop enter");
+ }
+
+ @Override
+ public void pause() {
+
+ }
+
+ @Override
+ public void resume() {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return KafkaSourceTask.class;
+ }
+
+ @Override
+ public List<KeyValue> taskConfigs() {
+
+ log.info("Source Connector taskConfigs enter");
+ List<KeyValue> configs = new ArrayList<>();
+ int task_num = connectConfig.getInt(Config.TASK_NUM);
+ log.info("Source Connector taskConfigs: task_num:" + task_num);
+ for (int i=0; i < task_num; ++i) {
+ KeyValue config = new ConnectKeyValue();
+ config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER));
+ config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS));
+ config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID));
+
+ config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
+ config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
+ config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+ configs.add(config);
+ }
+ return configs;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
new file mode 100644
index 0000000..d4b39e0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.*;
+
+public class KafkaSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaSourceTask.class);
+ private KafkaConsumer<ByteBuffer, ByteBuffer> consumer;
+ private KeyValue config;
+ private List<String> topicList;
+ private List<TopicPartition> currentTPList;
+
+ @Override
+ public Collection<SourceDataEntry> poll() {
+
+ try {
+ ArrayList<SourceDataEntry> entries = new ArrayList<>();
+ ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(1000);
+ if (records.count() > 0) {
+ log.info("consumer.poll, records.count {}", records.count());
+ }
+ for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
+ String topic_partition = record.topic() + "-" + record.partition();
+ log.info("Received {} record: {} ", topic_partition, record);
+
+ Schema schema = new Schema();
+ List<Field> fields = new ArrayList<>();
+ fields.add(new Field(0, "key", FieldType.BYTES));
+ fields.add(new Field(1, "value", FieldType.BYTES));
+ schema.setName(record.topic());
+ schema.setFields(fields);
+ schema.setDataSource(record.topic());
+
+ ByteBuffer sourcePartition = ByteBuffer.wrap(topic_partition.getBytes());
+ ByteBuffer sourcePosition = ByteBuffer.allocate(8);
+ sourcePosition.asLongBuffer().put(record.offset());
+
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.entryType(EntryType.CREATE);
+ dataEntryBuilder.queue(record.topic()); //queueName will be set to RocketMQ topic by runtime
+ dataEntryBuilder.timestamp(System.currentTimeMillis());
+ if (record.key() != null) {
+ dataEntryBuilder.putFiled("key", JSON.toJSONString(record.key().array()));
+ } else {
+ dataEntryBuilder.putFiled("key", null);
+ }
+ dataEntryBuilder.putFiled("value", JSON.toJSONString(record.value().array()));
+ SourceDataEntry entry = dataEntryBuilder.buildSourceDataEntry(sourcePartition, sourcePosition);
+ entries.add(entry);
+ }
+
+ log.info("poll return entries size {} ", entries.size());
+ return entries;
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("poll exception {}", e);
+ }
+ return null;
+ }
+
+ @Override
+ public void start(KeyValue taskConfig) {
+ log.info("source task start enter");
+ this.topicList = new ArrayList<>();
+ this.currentTPList = new ArrayList<>();
+ this.config = taskConfig;
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID));
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
+
+ this.consumer = new KafkaConsumer<>(props);
+
+ String topics = this.config.getString(Config.TOPICS);
+ for (String topic : topics.split(",")) {
+ if (!topic.isEmpty()) {
+ topicList.add(topic);
+ }
+ }
+
+ consumer.subscribe(topicList, new MyRebalanceListener());
+ log.info("source task subscribe topicList {}", topicList);
+ }
+
+ @Override
+ public void stop() {
+ log.info("source task stop enter");
+ try {
+ commitOffset(currentTPList, true);
+ consumer.close();
+ } catch (Exception e) {
+ log.warn("{} consumer {} close exception {}", this, consumer, e);
+ }
+ }
+
+ @Override
+ public void pause() {
+ log.info("source task pause ...");
+ }
+
+ @Override
+ public void resume() {
+ log.info("source task resume ...");
+ }
+
+ public String toString() {
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ String pid = name.split("@")[0];
+ return "KafkaSourceTask-PID[" + pid + "]-" + Thread.currentThread().toString();
+ }
+
+ public static TopicPartition getTopicPartition(ByteBuffer buffer)
+ {
+ Charset charset = null;
+ CharsetDecoder decoder = null;
+ CharBuffer charBuffer = null;
+ try
+ {
+ charset = Charset.forName("UTF-8");
+ decoder = charset.newDecoder();
+ charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
+ String topic_partition = charBuffer.toString();
+ int index = topic_partition.lastIndexOf('-');
+ if (index != -1 && index > 1) {
+ String topic = topic_partition.substring(0, index - 1);
+ int partition = Integer.parseInt(topic_partition.substring(index + 1));
+ return new TopicPartition(topic, partition);
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ log.warn("getString Exception {}", ex);
+ }
+ return null;
+ }
+
+ private void commitOffset(Collection<TopicPartition> tpList, boolean isClose) {
+
+ if(tpList == null || tpList.isEmpty())
+ return;
+
+ log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList);
+ List<ByteBuffer> topic_partition_list = new ArrayList<>();
+ for (TopicPartition tp : tpList) {
+ topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes()));
+ }
+
+ Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
+ Map<ByteBuffer, ByteBuffer> topic_position_map = context.positionStorageReader().getPositions(topic_partition_list);
+ for (Map.Entry<ByteBuffer, ByteBuffer> entry : topic_position_map.entrySet()) {
+ TopicPartition tp = getTopicPartition(entry.getKey());
+ if (tp != null && tpList.contains(tp)) {
+ //positionStorage store more than this task's topic and partition
+ try {
+ long local_offset = entry.getValue().asLongBuffer().get();
+ commitOffsets.put(tp, new OffsetAndMetadata(local_offset));
+ } catch (Exception e) {
+ log.warn("commitOffset get local offset exception {}", e);
+ }
+ }
+ }
+
+ commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) ->
+ log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
+ if (!commitOffsets.isEmpty()) {
+ if (isClose) {
+ consumer.commitSync(commitOffsets);
+ } else {
+ consumer.commitAsync(commitOffsets, new MyOffsetCommitCallback());
+ }
+ }
+ }
+
+ private class MyOffsetCommitCallback implements OffsetCommitCallback {
+
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
+ if (e != null) {
+ log.warn("commit async excepiton {}", e);
+ map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> {
+ log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset());
+ });
+ return;
+ }
+ }
+ }
+
+ private class MyRebalanceListener implements ConsumerRebalanceListener {
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+
+ currentTPList.clear();
+ for (TopicPartition tp : partitions) {
+ currentTPList.add(tp);
+ }
+ currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned TopicPartition {}", tp));
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+ log.info("onPartitionsRevoked {} Partitions revoked", KafkaSourceTask.this);
+ try {
+ commitOffset(partitions, false);
+ } catch (Exception e) {
+ log.warn("onPartitionsRevoked exception", e);
+ }
+ }
+ }
+}
diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties
new file mode 100644
index 0000000..f974cb9
--- /dev/null
+++ b/src/main/resources/connect-kafka-source.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=rocketmq-connect-kafka
+connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector
+oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default
+source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter
+task.num=2
+kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092
+kafka.topic=jonnxu
+kafka.group.id=connect-kafka-source-consumer-group
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
new file mode 100644
index 0000000..c64b5e7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSourceConnectorTest {
+ KafkaSourceConnector connector = new KafkaSourceConnector();
+
+ @Test
+ public void verifyAndSetConfigTest() {
+ KeyValue keyValue = new DefaultKeyValue();
+
+ for (String requestKey : Config.REQUEST_CONFIG) {
+ assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+ keyValue.put(requestKey, requestKey);
+ }
+ assertEquals(connector.verifyAndSetConfig(keyValue), "");
+ }
+
+ @Test
+ public void taskClassTest() {
+ assertEquals(connector.taskClass(), KafkaSourceConnector.class);
+ }
+
+ @Test
+ public void taskConfigsTest() {
+ assertEquals(connector.taskConfigs().get(0), null);
+ KeyValue keyValue = new DefaultKeyValue();
+ for (String requestKey : Config.REQUEST_CONFIG) {
+ keyValue.put(requestKey, requestKey);
+ }
+ connector.verifyAndSetConfig(keyValue);
+ assertEquals(connector.taskConfigs().get(0), keyValue);
+ }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
new file mode 100644
index 0000000..57239f6
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class KafkaSourceTaskTest {
+
+ @Test
+ public void pollTest() throws Exception {
+ KafkaSourceTask task = new KafkaSourceTask();
+ Field config = KafkaSourceTask.class.getDeclaredField("config");
+ config.setAccessible(true);
+
+ Collection<SourceDataEntry> list = task.poll();
+ Assert.assertEquals(list.size(), 1);
+
+ list = task.poll();
+ Assert.assertEquals(list.size(), 0);
+
+ }
+}