You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:21:38 UTC

[rocketmq-connect] 02/07: rocketmq-connect-kafka

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit bbb12023c30a11d1b6190d0e00fdb5c6422d6f05
Author: jonnxu <jo...@163.com>
AuthorDate: Fri Jul 5 01:39:45 2019 +0800

    rocketmq-connect-kafka
---
 pom.xml                                            | 210 ++++++++++++++++++
 .../org/apache/rocketmq/connect/kafka/Config.java  | 116 ++++++++++
 .../kafka/connector/KafkaSourceConnector.java      | 103 +++++++++
 .../connect/kafka/connector/KafkaSourceTask.java   | 247 +++++++++++++++++++++
 src/main/resources/connect-kafka-source.properties |  23 ++
 .../kafka/connector/KafkaSourceConnectorTest.java  |  56 +++++
 .../kafka/connector/KafkaSourceTaskTest.java       |  43 ++++
 7 files changed, 798 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..08712ca
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,210 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-kafka</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>rocketmq-connect-kafka</name>
+    <packaging>pom</packaging>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <scm>
+        <url>https://github.com/openmessaging/openmessaging-connector</url>
+        <connection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</connection>
+        <developerConnection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.11.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <!--
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.5</version>
+            <scope>test</scope>
+        </dependency>
+        -->
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connect-runtime</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>1.0.0-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+        <dependency>
+            <groupId>io.javalin</groupId>
+            <artifactId>javalin</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
new file mode 100644
index 0000000..869597e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.*;
+
+public class Config {
+
+    public static String TASK_NUM = "tasks.num";
+    public static String TOPICS = "kafka.topics";
+    public static String GROUP_ID = "kafka.group.id";
+    public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
+    public static String ROCKETMQ_TOPIC = "rocketmq.topic";
+
+    private String bootstrapServers;
+    private String topics;
+    private String groupId;
+
+    public String getTopics() {
+        return topics;
+    }
+
+    public void setTopics(String topics) {
+        this.topics = topics;
+    }
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+        {
+            add(TOPICS);
+            add(GROUP_ID);
+            add(BOOTSTRAP_SERVER);
+        }
+    };
+
+    public void load(KeyValue props) {
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public static Set<String> getRequestConfig() {
+        return REQUEST_CONFIG;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
new file mode 100644
index 0000000..ba30901
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connect.runtime.common.ConnectKeyValue;
+import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaSourceConnector extends SourceConnector{
+    private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class);
+
+    private KeyValue connectConfig;
+
+    public KafkaSourceConnector() {
+        super();
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+
+        log.info("KafkaSourceConnector verifyAndSetConfig enter");
+        for ( String key : config.keySet()) {
+            log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
+        }
+
+        for(String requestKey : Config.REQUEST_CONFIG){
+            if(!config.containsKey(requestKey)){
+                return "Request Config key: " + requestKey;
+            }
+        }
+        this.connectConfig = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+        log.info("KafkaSourceConnector start enter");
+    }
+
+    @Override
+    public void stop() {
+        log.info("KafkaSourceConnector stop enter");
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return KafkaSourceTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+
+        log.info("Source Connector taskConfigs enter");
+        List<KeyValue> configs = new ArrayList<>();
+        int task_num = connectConfig.getInt(Config.TASK_NUM);
+        log.info("Source Connector taskConfigs: task_num:" + task_num);
+        for (int i=0; i < task_num; ++i) {
+            KeyValue config = new ConnectKeyValue();
+            config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER));
+            config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS));
+            config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID));
+
+            config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
+            config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
+            config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+            configs.add(config);
+        }
+        return configs;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
new file mode 100644
index 0000000..d4b39e0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.*;
+
+public class KafkaSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaSourceTask.class);
+    private KafkaConsumer<ByteBuffer, ByteBuffer> consumer;
+    private KeyValue config;
+    private List<String> topicList;
+    private List<TopicPartition> currentTPList;
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+
+        try {
+            ArrayList<SourceDataEntry> entries = new ArrayList<>();
+            ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(1000);
+            if (records.count() > 0) {
+                log.info("consumer.poll, records.count {}", records.count());
+            }
+            for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
+                String topic_partition = record.topic() + "-" + record.partition();
+                log.info("Received {} record: {} ", topic_partition, record);
+
+                Schema schema = new Schema();
+                List<Field> fields = new ArrayList<>();
+                fields.add(new Field(0, "key", FieldType.BYTES));
+                fields.add(new Field(1, "value", FieldType.BYTES));
+                schema.setName(record.topic());
+                schema.setFields(fields);
+                schema.setDataSource(record.topic());
+
+                ByteBuffer sourcePartition = ByteBuffer.wrap(topic_partition.getBytes());
+                ByteBuffer sourcePosition = ByteBuffer.allocate(8);
+                sourcePosition.asLongBuffer().put(record.offset());
+
+                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                dataEntryBuilder.entryType(EntryType.CREATE);
+                dataEntryBuilder.queue(record.topic()); //queueName will be set to RocketMQ topic by runtime
+                dataEntryBuilder.timestamp(System.currentTimeMillis());
+                if (record.key() != null) {
+                    dataEntryBuilder.putFiled("key", JSON.toJSONString(record.key().array()));
+                } else {
+                    dataEntryBuilder.putFiled("key", null);
+                }
+                dataEntryBuilder.putFiled("value", JSON.toJSONString(record.value().array()));
+                SourceDataEntry entry = dataEntryBuilder.buildSourceDataEntry(sourcePartition, sourcePosition);
+                entries.add(entry);
+            }
+
+            log.info("poll return entries size {} ", entries.size());
+            return entries;
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("poll exception {}", e);
+        }
+        return null;
+    }
+
+    @Override
+    public void start(KeyValue taskConfig) {
+        log.info("source task start enter");
+        this.topicList = new ArrayList<>();
+        this.currentTPList = new ArrayList<>();
+        this.config = taskConfig;
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
+
+        this.consumer = new KafkaConsumer<>(props);
+
+        String topics = this.config.getString(Config.TOPICS);
+        for (String topic : topics.split(",")) {
+            if (!topic.isEmpty()) {
+                topicList.add(topic);
+            }
+        }
+
+        consumer.subscribe(topicList, new MyRebalanceListener());
+        log.info("source task subscribe topicList {}", topicList);
+    }
+
+    @Override
+    public void stop() {
+        log.info("source task stop enter");
+        try {
+            commitOffset(currentTPList, true);
+            consumer.close();
+        } catch (Exception e) {
+            log.warn("{} consumer {} close exception {}", this, consumer, e);
+        }
+    }
+
+    @Override
+    public void pause() {
+        log.info("source task pause ...");
+    }
+
+    @Override
+    public void resume() {
+        log.info("source task resume ...");
+    }
+
+    public String toString() {
+        String name = ManagementFactory.getRuntimeMXBean().getName();
+        String pid = name.split("@")[0];
+        return "KafkaSourceTask-PID[" + pid + "]-" + Thread.currentThread().toString();
+    }
+
+    public static TopicPartition getTopicPartition(ByteBuffer buffer)
+    {
+        Charset charset = null;
+        CharsetDecoder decoder = null;
+        CharBuffer charBuffer = null;
+        try
+        {
+            charset = Charset.forName("UTF-8");
+            decoder = charset.newDecoder();
+            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
+            String topic_partition = charBuffer.toString();
+            int index = topic_partition.lastIndexOf('-');
+            if (index != -1 && index > 1) {
+                String topic = topic_partition.substring(0, index - 1);
+                int partition = Integer.parseInt(topic_partition.substring(index + 1));
+                return new TopicPartition(topic, partition);
+            }
+        }
+        catch (Exception ex)
+        {
+            ex.printStackTrace();
+            log.warn("getString Exception {}", ex);
+        }
+        return null;
+    }
+
+    private void commitOffset(Collection<TopicPartition> tpList, boolean isClose) {
+
+        if(tpList == null || tpList.isEmpty())
+            return;
+
+        log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList);
+        List<ByteBuffer> topic_partition_list = new ArrayList<>();
+        for (TopicPartition tp : tpList) {
+            topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes()));
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
+        Map<ByteBuffer, ByteBuffer> topic_position_map = context.positionStorageReader().getPositions(topic_partition_list);
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : topic_position_map.entrySet()) {
+            TopicPartition tp = getTopicPartition(entry.getKey());
+            if (tp != null && tpList.contains(tp)) {
+                //positionStorage store more than this task's topic and partition
+                try {
+                    long local_offset = entry.getValue().asLongBuffer().get();
+                    commitOffsets.put(tp, new OffsetAndMetadata(local_offset));
+                } catch (Exception e) {
+                    log.warn("commitOffset get local offset exception {}", e);
+                }
+            }
+        }
+
+        commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) ->
+                log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
+        if (!commitOffsets.isEmpty()) {
+            if (isClose) {
+                consumer.commitSync(commitOffsets);
+            } else {
+                consumer.commitAsync(commitOffsets, new MyOffsetCommitCallback());
+            }
+        }
+    }
+
+    private class MyOffsetCommitCallback implements OffsetCommitCallback {
+
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
+            if (e != null) {
+                log.warn("commit async excepiton {}", e);
+                map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> {
+                    log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset());
+                });
+                return;
+            }
+        }
+    }
+
+    private class MyRebalanceListener implements ConsumerRebalanceListener {
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+
+            currentTPList.clear();
+            for (TopicPartition tp : partitions) {
+                currentTPList.add(tp);
+            }
+            currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned  TopicPartition {}", tp));
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+            log.info("onPartitionsRevoked {} Partitions revoked", KafkaSourceTask.this);
+            try {
+                commitOffset(partitions, false);
+            } catch (Exception e) {
+                log.warn("onPartitionsRevoked exception", e);
+            }
+        }
+    }
+}
diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties
new file mode 100644
index 0000000..f974cb9
--- /dev/null
+++ b/src/main/resources/connect-kafka-source.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=rocketmq-connect-kafka
+connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector
+oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default
+source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter
+task.num=2
+kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092
+kafka.topic=jonnxu
+kafka.group.id=connect-kafka-source-consumer-group
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
new file mode 100644
index 0000000..c64b5e7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSourceConnectorTest {
+    KafkaSourceConnector connector = new KafkaSourceConnector();
+
+    @Test
+    public void verifyAndSetConfigTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(connector.verifyAndSetConfig(keyValue), "");
+    }
+
+    @Test
+    public void taskClassTest() {
+        assertEquals(connector.taskClass(), KafkaSourceConnector.class);
+    }
+
+    @Test
+    public void taskConfigsTest() {
+        assertEquals(connector.taskConfigs().get(0), null);
+        KeyValue keyValue = new DefaultKeyValue();
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            keyValue.put(requestKey, requestKey);
+        }
+        connector.verifyAndSetConfig(keyValue);
+        assertEquals(connector.taskConfigs().get(0), keyValue);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
new file mode 100644
index 0000000..57239f6
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class KafkaSourceTaskTest {
+
+    @Test
+    public void pollTest() throws Exception {
+        KafkaSourceTask task = new KafkaSourceTask();
+        Field config = KafkaSourceTask.class.getDeclaredField("config");
+        config.setAccessible(true);
+
+        Collection<SourceDataEntry> list = task.poll();
+        Assert.assertEquals(list.size(), 1);
+
+        list = task.poll();
+        Assert.assertEquals(list.size(), 0);
+
+    }
+}