You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:21:36 UTC

[rocketmq-connect] branch master updated (b3cbb0b -> 00d60fb)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git.


    from b3cbb0b  Add 'connector/rocketmq-connect-jms/' from commit '37a4db762dfac7496e2e0e40cf7cf25c4bd0f01d'
     new d0ff0aa  Init rocketmq-kafka connect project
     new bbb1202  rocketmq-connect-kafka
     new 9aa0640  Update README.md for rocketmq-connect-kafka (#338)
     new 598de5d  [ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342)
     new e74c022  [ISSUE #420]remove openmessage-runtime dependency
     new 59ac9fd  [rocketmq-connect-kafka]: Completion method KafkaSourceTask#pause(), KafkaSourceTask#resume(). (#854)
     new 00d60fb  Add 'connector/rocketmq-connect-kafka/' from commit '59ac9fd7b7fd11ad7943b5ef30a3f95b20acadd7'

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 connector/rocketmq-connect-kafka/README.md         |  30 +++
 .../pom.xml                                        |  77 ++++---
 .../connect/kafka/config/ConfigDefine.java         |  67 ++++++
 .../rocketmq/connect/kafka/config/ConfigUtil.java  |  14 +-
 .../kafka/connector/KafkaSourceConnector.java      | 104 +++++++++
 .../connect/kafka/connector/KafkaSourceTask.java   | 250 +++++++++++++++++++++
 .../main/resources/connect-kafka-source.properties |  12 +-
 .../kafka/connector/KafkaSourceConnectorTest.java} |  27 ++-
 .../kafka/connector/KafkaSourceTaskTest.java}      |  30 +--
 9 files changed, 543 insertions(+), 68 deletions(-)
 create mode 100644 connector/rocketmq-connect-kafka/README.md
 copy connector/{rocketmq-connect-jms => rocketmq-connect-kafka}/pom.xml (82%)
 create mode 100644 connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
 copy rocketmq-connect-sample/src/main/java/org/apache/rocketmq/connect/file/FileUtils.java => connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java (90%)
 create mode 100644 connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
 create mode 100644 connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
 copy rocketmq-connect-cli/connectAdmin => connector/rocketmq-connect-kafka/src/main/resources/connect-kafka-source.properties (65%)
 copy connector/{rocketmq-connect-activemq/src/test/java/org/apache/rocketmq/connect/activemq/connector/ActivemqConnectorTest.java => rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java} (69%)
 copy connector/{rocketmq-connect-cassandra/src/main/java/org/apache/rocketmq/connect/cassandra/schema/column/TimeColumnParser.java => rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java} (53%)

[rocketmq-connect] 05/07: [ISSUE #420]remove openmessage-runtime dependency

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit e74c022c593a99b5c79d8e8a774254e7ef44f01a
Author: sanchen <sa...@chenyiliu.com>
AuthorDate: Sun Oct 6 16:42:42 2019 +0800

    [ISSUE #420]remove openmessage-runtime dependency
---
 README.md                                          |  2 +-
 pom.xml                                            |  5 --
 .../connect/kafka/config/ConfigDefine.java         | 67 ++++++++++++++++++++++
 .../kafka/{Config.java => config/ConfigUtil.java}  | 58 ++-----------------
 .../kafka/connector/KafkaSourceConnector.java      | 35 +++++------
 .../connect/kafka/connector/KafkaSourceTask.java   |  8 +--
 src/main/resources/connect-kafka-source.properties |  1 -
 .../kafka/connector/KafkaSourceConnectorTest.java  | 15 ++---
 8 files changed, 104 insertions(+), 87 deletions(-)

diff --git a/README.md b/README.md
index d0917cb..213c8fa 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
 
 **启动Connector**
 
-http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms: rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"io.openmessaging.connect.runtime.converter.JsonConverter"}
+http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
 
 **查看Connector运行状态**
 
diff --git a/pom.xml b/pom.xml
index 08712ca..ccc4cc1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,11 +158,6 @@
         -->
         <dependency>
             <groupId>io.openmessaging</groupId>
-            <artifactId>openmessaging-connect-runtime</artifactId>
-            <version>0.0.1-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
             <version>0.1.0-beta</version>
         </dependency>
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
new file mode 100644
index 0000000..9a7f1ba
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.config;
+
+import java.util.*;
+
+public class ConfigDefine {
+
+    public static String TASK_NUM = "tasks.num";
+    public static String TOPICS = "kafka.topics";
+    public static String GROUP_ID = "kafka.group.id";
+    public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
+    public static String CONNECTOR_CLASS = "connector-class";
+    public static String SOURCE_RECORD_CONVERTER = "source-record-converter";
+    public static String ROCKETMQ_TOPIC = "rocketmq.topic";
+
+    private String bootstrapServers;
+    private String topics;
+    private String groupId;
+
+    public String getTopics() {
+        return topics;
+    }
+
+    public void setTopics(String topics) {
+        this.topics = topics;
+    }
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+        {
+            add(TOPICS);
+            add(GROUP_ID);
+            add(BOOTSTRAP_SERVER);
+        }
+    };
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
similarity index 66%
rename from src/main/java/org/apache/rocketmq/connect/kafka/Config.java
rename to src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
index 869597e..0587dae 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
@@ -14,62 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.rocketmq.connect.kafka;
+package org.apache.rocketmq.connect.kafka.config;
 
 import io.openmessaging.KeyValue;
-import java.lang.reflect.Method;
-import java.util.*;
-
-public class Config {
-
-    public static String TASK_NUM = "tasks.num";
-    public static String TOPICS = "kafka.topics";
-    public static String GROUP_ID = "kafka.group.id";
-    public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
-    public static String ROCKETMQ_TOPIC = "rocketmq.topic";
-
-    private String bootstrapServers;
-    private String topics;
-    private String groupId;
 
-    public String getTopics() {
-        return topics;
-    }
-
-    public void setTopics(String topics) {
-        this.topics = topics;
-    }
-
-    public String getBootstrapServers() {
-        return bootstrapServers;
-    }
+import java.lang.reflect.Method;
 
-    public void setBootstrapServers(String bootstrapServers) {
-        this.bootstrapServers = bootstrapServers;
-    }
+public class ConfigUtil {
 
-    public String getGroupId() {
-        return groupId;
-    }
+    public static <T> void load(KeyValue props, Object object) {
 
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
+        properties2Object(props, object);
     }
 
-    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
-        {
-            add(TOPICS);
-            add(GROUP_ID);
-            add(BOOTSTRAP_SERVER);
-        }
-    };
-
-    public void load(KeyValue props) {
-        properties2Object(props, this);
-    }
-
-    private void properties2Object(final KeyValue p, final Object object) {
+    private static <T> void properties2Object(final KeyValue p, final Object object) {
 
         Method[] methods = object.getClass().getMethods();
         for (Method method : methods) {
@@ -109,8 +67,4 @@ public class Config {
             }
         }
     }
-
-    public static Set<String> getRequestConfig() {
-        return REQUEST_CONFIG;
-    }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
index ba30901..567a8e9 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -18,18 +18,17 @@
 package org.apache.rocketmq.connect.kafka.connector;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connect.runtime.common.ConnectKeyValue;
-import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
-import org.apache.rocketmq.connect.kafka.Config;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class KafkaSourceConnector extends SourceConnector{
+public class KafkaSourceConnector extends SourceConnector {
     private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class);
 
     private KeyValue connectConfig;
@@ -42,12 +41,12 @@ public class KafkaSourceConnector extends SourceConnector{
     public String verifyAndSetConfig(KeyValue config) {
 
         log.info("KafkaSourceConnector verifyAndSetConfig enter");
-        for ( String key : config.keySet()) {
+        for (String key : config.keySet()) {
             log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
         }
 
-        for(String requestKey : Config.REQUEST_CONFIG){
-            if(!config.containsKey(requestKey)){
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
                 return "Request Config key: " + requestKey;
             }
         }
@@ -82,20 +81,22 @@ public class KafkaSourceConnector extends SourceConnector{
 
     @Override
     public List<KeyValue> taskConfigs() {
+        if (connectConfig == null) {
+            return new ArrayList<KeyValue>();
+        }
 
         log.info("Source Connector taskConfigs enter");
         List<KeyValue> configs = new ArrayList<>();
-        int task_num = connectConfig.getInt(Config.TASK_NUM);
+        int task_num = connectConfig.getInt(ConfigDefine.TASK_NUM);
         log.info("Source Connector taskConfigs: task_num:" + task_num);
-        for (int i=0; i < task_num; ++i) {
-            KeyValue config = new ConnectKeyValue();
-            config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER));
-            config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS));
-            config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID));
-
-            config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
-            config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
-            config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+        for (int i = 0; i < task_num; ++i) {
+            KeyValue config = new DefaultKeyValue();
+            config.put(ConfigDefine.BOOTSTRAP_SERVER, connectConfig.getString(ConfigDefine.BOOTSTRAP_SERVER));
+            config.put(ConfigDefine.TOPICS, connectConfig.getString(ConfigDefine.TOPICS));
+            config.put(ConfigDefine.GROUP_ID, connectConfig.getString(ConfigDefine.GROUP_ID));
+
+            config.put(ConfigDefine.CONNECTOR_CLASS, connectConfig.getString(ConfigDefine.CONNECTOR_CLASS));
+            config.put(ConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(ConfigDefine.SOURCE_RECORD_CONVERTER));
             configs.add(config);
         }
         return configs;
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index 1f7ed00..6122b0e 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -23,7 +23,7 @@ import io.openmessaging.connector.api.data.*;
 import io.openmessaging.connector.api.source.SourceTask;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.rocketmq.connect.kafka.Config;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,8 +97,8 @@ public class KafkaSourceTask extends SourceTask {
         this.currentTPList = new ArrayList<>();
         this.config = taskConfig;
         Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER));
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID));
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(ConfigDefine.BOOTSTRAP_SERVER));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(ConfigDefine.GROUP_ID));
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
@@ -106,7 +106,7 @@ public class KafkaSourceTask extends SourceTask {
 
         this.consumer = new KafkaConsumer<>(props);
 
-        String topics = this.config.getString(Config.TOPICS);
+        String topics = this.config.getString(ConfigDefine.TOPICS);
         for (String topic : topics.split(",")) {
             if (!topic.isEmpty()) {
                 topicList.add(topic);
diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties
index f974cb9..5ab36ed 100644
--- a/src/main/resources/connect-kafka-source.properties
+++ b/src/main/resources/connect-kafka-source.properties
@@ -15,7 +15,6 @@
 
 name=rocketmq-connect-kafka
 connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector
-oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default
 source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter
 task.num=2
 kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
index c64b5e7..1e12ca3 100644
--- a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.connect.kafka.connector;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.connect.kafka.Config;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -31,8 +31,8 @@ public class KafkaSourceConnectorTest {
     public void verifyAndSetConfigTest() {
         KeyValue keyValue = new DefaultKeyValue();
 
-        for (String requestKey : Config.REQUEST_CONFIG) {
-            assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request Config key: " + requestKey);
             keyValue.put(requestKey, requestKey);
         }
         assertEquals(connector.verifyAndSetConfig(keyValue), "");
@@ -40,17 +40,18 @@ public class KafkaSourceConnectorTest {
 
     @Test
     public void taskClassTest() {
-        assertEquals(connector.taskClass(), KafkaSourceConnector.class);
+        assertEquals(connector.taskClass(), KafkaSourceTask.class);
     }
 
     @Test
     public void taskConfigsTest() {
-        assertEquals(connector.taskConfigs().get(0), null);
+        assertEquals(connector.taskConfigs().size(), 0);
         KeyValue keyValue = new DefaultKeyValue();
-        for (String requestKey : Config.REQUEST_CONFIG) {
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             keyValue.put(requestKey, requestKey);
         }
+        keyValue.put(ConfigDefine.TASK_NUM,1);
         connector.verifyAndSetConfig(keyValue);
-        assertEquals(connector.taskConfigs().get(0), keyValue);
+        assertEquals(connector.taskConfigs().get(0).getString(ConfigDefine.TOPICS), keyValue.getString(ConfigDefine.TOPICS));
     }
 }

[rocketmq-connect] 07/07: Add 'connector/rocketmq-connect-kafka/' from commit '59ac9fd7b7fd11ad7943b5ef30a3f95b20acadd7'

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 00d60fb18f82dd396d1ebc0423cde95f32f8fa9e
Merge: b3cbb0b 59ac9fd
Author: odbozhou <87...@qq.com>
AuthorDate: Wed Mar 2 11:21:08 2022 +0800

    Add 'connector/rocketmq-connect-kafka/' from commit '59ac9fd7b7fd11ad7943b5ef30a3f95b20acadd7'
    
    git-subtree-dir: connector/rocketmq-connect-kafka
    git-subtree-mainline: b3cbb0b49f361aeb83fc1ec2d26f00c1d6b6e449
    git-subtree-split: 59ac9fd7b7fd11ad7943b5ef30a3f95b20acadd7

 connector/rocketmq-connect-kafka/README.md         |  30 +++
 connector/rocketmq-connect-kafka/pom.xml           | 205 +++++++++++++++++
 .../connect/kafka/config/ConfigDefine.java         |  67 ++++++
 .../rocketmq/connect/kafka/config/ConfigUtil.java  |  70 ++++++
 .../kafka/connector/KafkaSourceConnector.java      | 104 +++++++++
 .../connect/kafka/connector/KafkaSourceTask.java   | 250 +++++++++++++++++++++
 .../main/resources/connect-kafka-source.properties |  22 ++
 .../kafka/connector/KafkaSourceConnectorTest.java  |  57 +++++
 .../kafka/connector/KafkaSourceTaskTest.java       |  43 ++++
 9 files changed, 848 insertions(+)

diff --cc connector/rocketmq-connect-kafka/README.md
index 0000000,213c8fa..213c8fa
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/README.md
+++ b/connector/rocketmq-connect-kafka/README.md
diff --cc connector/rocketmq-connect-kafka/pom.xml
index 0000000,ccc4cc1..ccc4cc1
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/pom.xml
+++ b/connector/rocketmq-connect-kafka/pom.xml
diff --cc connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
index 0000000,9a7f1ba..9a7f1ba
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
+++ b/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
diff --cc connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
index 0000000,0587dae..0587dae
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
+++ b/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
diff --cc connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
index 0000000,680df6e..680df6e
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
+++ b/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
diff --cc connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index 0000000,f077ac0..f077ac0
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/connector/rocketmq-connect-kafka/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
diff --cc connector/rocketmq-connect-kafka/src/main/resources/connect-kafka-source.properties
index 0000000,5ab36ed..5ab36ed
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/src/main/resources/connect-kafka-source.properties
+++ b/connector/rocketmq-connect-kafka/src/main/resources/connect-kafka-source.properties
diff --cc connector/rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
index 0000000,1e12ca3..1e12ca3
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
+++ b/connector/rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
diff --cc connector/rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
index 0000000,57239f6..57239f6
mode 000000,100644..100644
--- a/connector/rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
+++ b/connector/rocketmq-connect-kafka/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java

[rocketmq-connect] 02/07: rocketmq-connect-kafka

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit bbb12023c30a11d1b6190d0e00fdb5c6422d6f05
Author: jonnxu <jo...@163.com>
AuthorDate: Fri Jul 5 01:39:45 2019 +0800

    rocketmq-connect-kafka
---
 pom.xml                                            | 210 ++++++++++++++++++
 .../org/apache/rocketmq/connect/kafka/Config.java  | 116 ++++++++++
 .../kafka/connector/KafkaSourceConnector.java      | 103 +++++++++
 .../connect/kafka/connector/KafkaSourceTask.java   | 247 +++++++++++++++++++++
 src/main/resources/connect-kafka-source.properties |  23 ++
 .../kafka/connector/KafkaSourceConnectorTest.java  |  56 +++++
 .../kafka/connector/KafkaSourceTaskTest.java       |  43 ++++
 7 files changed, 798 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..08712ca
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,210 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-kafka</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>rocketmq-connect-kafka</name>
+    <packaging>pom</packaging>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+            <distribution>repo</distribution>
+        </license>
+    </licenses>
+
+    <scm>
+        <url>https://github.com/openmessaging/openmessaging-connector</url>
+        <connection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</connection>
+        <developerConnection>scm:git:git@github.com:openmessaging/openmessaging-connector.git</developerConnection>
+        <tag>HEAD</tag>
+    </scm>
+
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>0.11.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <!--
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.5</version>
+            <scope>test</scope>
+        </dependency>
+        -->
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connect-runtime</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <version>0.1.0-beta</version>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+            <version>1.0.0-alpha</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+            <version>4.3.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.51</version>
+        </dependency>
+        <dependency>
+            <groupId>io.javalin</groupId>
+            <artifactId>javalin</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.0.13</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>1.2</version>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
new file mode 100644
index 0000000..869597e
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.Method;
+import java.util.*;
+
+public class Config {
+
+    public static String TASK_NUM = "tasks.num";
+    public static String TOPICS = "kafka.topics";
+    public static String GROUP_ID = "kafka.group.id";
+    public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
+    public static String ROCKETMQ_TOPIC = "rocketmq.topic";
+
+    private String bootstrapServers;
+    private String topics;
+    private String groupId;
+
+    public String getTopics() {
+        return topics;
+    }
+
+    public void setTopics(String topics) {
+        this.topics = topics;
+    }
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+        {
+            add(TOPICS);
+            add(GROUP_ID);
+            add(BOOTSTRAP_SERVER);
+        }
+    };
+
+    public void load(KeyValue props) {
+        properties2Object(props, this);
+    }
+
+    private void properties2Object(final KeyValue p, final Object object) {
+
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getString(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, arg);
+                        }
+                    }
+                } catch (Throwable ignored) {
+                }
+            }
+        }
+    }
+
+    public static Set<String> getRequestConfig() {
+        return REQUEST_CONFIG;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
new file mode 100644
index 0000000..ba30901
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connect.runtime.common.ConnectKeyValue;
+import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaSourceConnector extends SourceConnector{
+    private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class);
+
+    private KeyValue connectConfig;
+
+    public KafkaSourceConnector() {
+        super();
+    }
+
+    @Override
+    public String verifyAndSetConfig(KeyValue config) {
+
+        log.info("KafkaSourceConnector verifyAndSetConfig enter");
+        for ( String key : config.keySet()) {
+            log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
+        }
+
+        for(String requestKey : Config.REQUEST_CONFIG){
+            if(!config.containsKey(requestKey)){
+                return "Request Config key: " + requestKey;
+            }
+        }
+        this.connectConfig = config;
+        return "";
+    }
+
+    @Override
+    public void start() {
+        log.info("KafkaSourceConnector start enter");
+    }
+
+    @Override
+    public void stop() {
+        log.info("KafkaSourceConnector stop enter");
+    }
+
+    @Override
+    public void pause() {
+
+    }
+
+    @Override
+    public void resume() {
+
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return KafkaSourceTask.class;
+    }
+
+    @Override
+    public List<KeyValue> taskConfigs() {
+
+        log.info("Source Connector taskConfigs enter");
+        List<KeyValue> configs = new ArrayList<>();
+        int task_num = connectConfig.getInt(Config.TASK_NUM);
+        log.info("Source Connector taskConfigs: task_num:" + task_num);
+        for (int i=0; i < task_num; ++i) {
+            KeyValue config = new ConnectKeyValue();
+            config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER));
+            config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS));
+            config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID));
+
+            config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
+            config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
+            config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+            configs.add(config);
+        }
+        return configs;
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
new file mode 100644
index 0000000..d4b39e0
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.util.*;
+
+public class KafkaSourceTask extends SourceTask {
+
+    private static final Logger log = LoggerFactory.getLogger(KafkaSourceTask.class);
+    private KafkaConsumer<ByteBuffer, ByteBuffer> consumer;
+    private KeyValue config;
+    private List<String> topicList;
+    private List<TopicPartition> currentTPList;
+
+    @Override
+    public Collection<SourceDataEntry> poll() {
+
+        try {
+            ArrayList<SourceDataEntry> entries = new ArrayList<>();
+            ConsumerRecords<ByteBuffer, ByteBuffer> records = consumer.poll(1000);
+            if (records.count() > 0) {
+                log.info("consumer.poll, records.count {}", records.count());
+            }
+            for (ConsumerRecord<ByteBuffer, ByteBuffer> record : records) {
+                String topic_partition = record.topic() + "-" + record.partition();
+                log.info("Received {} record: {} ", topic_partition, record);
+
+                Schema schema = new Schema();
+                List<Field> fields = new ArrayList<>();
+                fields.add(new Field(0, "key", FieldType.BYTES));
+                fields.add(new Field(1, "value", FieldType.BYTES));
+                schema.setName(record.topic());
+                schema.setFields(fields);
+                schema.setDataSource(record.topic());
+
+                ByteBuffer sourcePartition = ByteBuffer.wrap(topic_partition.getBytes());
+                ByteBuffer sourcePosition = ByteBuffer.allocate(8);
+                sourcePosition.asLongBuffer().put(record.offset());
+
+                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                dataEntryBuilder.entryType(EntryType.CREATE);
+                dataEntryBuilder.queue(record.topic()); //queueName will be set to RocketMQ topic by runtime
+                dataEntryBuilder.timestamp(System.currentTimeMillis());
+                if (record.key() != null) {
+                    dataEntryBuilder.putFiled("key", JSON.toJSONString(record.key().array()));
+                } else {
+                    dataEntryBuilder.putFiled("key", null);
+                }
+                dataEntryBuilder.putFiled("value", JSON.toJSONString(record.value().array()));
+                SourceDataEntry entry = dataEntryBuilder.buildSourceDataEntry(sourcePartition, sourcePosition);
+                entries.add(entry);
+            }
+
+            log.info("poll return entries size {} ", entries.size());
+            return entries;
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("poll exception {}", e);
+        }
+        return null;
+    }
+
+    @Override
+    public void start(KeyValue taskConfig) {
+        log.info("source task start enter");
+        this.topicList = new ArrayList<>();
+        this.currentTPList = new ArrayList<>();
+        this.config = taskConfig;
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID));
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
+
+        this.consumer = new KafkaConsumer<>(props);
+
+        String topics = this.config.getString(Config.TOPICS);
+        for (String topic : topics.split(",")) {
+            if (!topic.isEmpty()) {
+                topicList.add(topic);
+            }
+        }
+
+        consumer.subscribe(topicList, new MyRebalanceListener());
+        log.info("source task subscribe topicList {}", topicList);
+    }
+
+    @Override
+    public void stop() {
+        log.info("source task stop enter");
+        try {
+            commitOffset(currentTPList, true);
+            consumer.close();
+        } catch (Exception e) {
+            log.warn("{} consumer {} close exception {}", this, consumer, e);
+        }
+    }
+
+    @Override
+    public void pause() {
+        log.info("source task pause ...");
+    }
+
+    @Override
+    public void resume() {
+        log.info("source task resume ...");
+    }
+
+    public String toString() {
+        String name = ManagementFactory.getRuntimeMXBean().getName();
+        String pid = name.split("@")[0];
+        return "KafkaSourceTask-PID[" + pid + "]-" + Thread.currentThread().toString();
+    }
+
+    public static TopicPartition getTopicPartition(ByteBuffer buffer)
+    {
+        Charset charset = null;
+        CharsetDecoder decoder = null;
+        CharBuffer charBuffer = null;
+        try
+        {
+            charset = Charset.forName("UTF-8");
+            decoder = charset.newDecoder();
+            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
+            String topic_partition = charBuffer.toString();
+            int index = topic_partition.lastIndexOf('-');
+            if (index != -1 && index > 1) {
+                String topic = topic_partition.substring(0, index - 1);
+                int partition = Integer.parseInt(topic_partition.substring(index + 1));
+                return new TopicPartition(topic, partition);
+            }
+        }
+        catch (Exception ex)
+        {
+            ex.printStackTrace();
+            log.warn("getString Exception {}", ex);
+        }
+        return null;
+    }
+
+    private void commitOffset(Collection<TopicPartition> tpList, boolean isClose) {
+
+        if(tpList == null || tpList.isEmpty())
+            return;
+
+        log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList);
+        List<ByteBuffer> topic_partition_list = new ArrayList<>();
+        for (TopicPartition tp : tpList) {
+            topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes()));
+        }
+
+        Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
+        Map<ByteBuffer, ByteBuffer> topic_position_map = context.positionStorageReader().getPositions(topic_partition_list);
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : topic_position_map.entrySet()) {
+            TopicPartition tp = getTopicPartition(entry.getKey());
+            if (tp != null && tpList.contains(tp)) {
+                //positionStorage store more than this task's topic and partition
+                try {
+                    long local_offset = entry.getValue().asLongBuffer().get();
+                    commitOffsets.put(tp, new OffsetAndMetadata(local_offset));
+                } catch (Exception e) {
+                    log.warn("commitOffset get local offset exception {}", e);
+                }
+            }
+        }
+
+        commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) ->
+                log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
+        if (!commitOffsets.isEmpty()) {
+            if (isClose) {
+                consumer.commitSync(commitOffsets);
+            } else {
+                consumer.commitAsync(commitOffsets, new MyOffsetCommitCallback());
+            }
+        }
+    }
+
+    private class MyOffsetCommitCallback implements OffsetCommitCallback {
+
+        @Override
+        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
+            if (e != null) {
+                log.warn("commit async excepiton {}", e);
+                map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> {
+                    log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset());
+                });
+                return;
+            }
+        }
+    }
+
+    private class MyRebalanceListener implements ConsumerRebalanceListener {
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+
+            currentTPList.clear();
+            for (TopicPartition tp : partitions) {
+                currentTPList.add(tp);
+            }
+            currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned  TopicPartition {}", tp));
+        }
+
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+
+            log.info("onPartitionsRevoked {} Partitions revoked", KafkaSourceTask.this);
+            try {
+                commitOffset(partitions, false);
+            } catch (Exception e) {
+                log.warn("onPartitionsRevoked exception", e);
+            }
+        }
+    }
+}
diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties
new file mode 100644
index 0000000..f974cb9
--- /dev/null
+++ b/src/main/resources/connect-kafka-source.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=rocketmq-connect-kafka
+connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector
+oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default
+source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter
+task.num=2
+kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092
+kafka.topic=jonnxu
+kafka.group.id=connect-kafka-source-consumer-group
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
new file mode 100644
index 0000000..c64b5e7
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.kafka.Config;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSourceConnectorTest {
+    KafkaSourceConnector connector = new KafkaSourceConnector();
+
+    @Test
+    public void verifyAndSetConfigTest() {
+        KeyValue keyValue = new DefaultKeyValue();
+
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+            keyValue.put(requestKey, requestKey);
+        }
+        assertEquals(connector.verifyAndSetConfig(keyValue), "");
+    }
+
+    @Test
+    public void taskClassTest() {
+        assertEquals(connector.taskClass(), KafkaSourceConnector.class);
+    }
+
+    @Test
+    public void taskConfigsTest() {
+        assertEquals(connector.taskConfigs().get(0), null);
+        KeyValue keyValue = new DefaultKeyValue();
+        for (String requestKey : Config.REQUEST_CONFIG) {
+            keyValue.put(requestKey, requestKey);
+        }
+        connector.verifyAndSetConfig(keyValue);
+        assertEquals(connector.taskConfigs().get(0), keyValue);
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
new file mode 100644
index 0000000..57239f6
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTaskTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connector;
+
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class KafkaSourceTaskTest {
+
+    @Test
+    public void pollTest() throws Exception {
+        KafkaSourceTask task = new KafkaSourceTask();
+        Field config = KafkaSourceTask.class.getDeclaredField("config");
+        config.setAccessible(true);
+
+        Collection<SourceDataEntry> list = task.poll();
+        Assert.assertEquals(list.size(), 1);
+
+        list = task.poll();
+        Assert.assertEquals(list.size(), 0);
+
+    }
+}

[rocketmq-connect] 06/07: [rocketmq-connect-kafka]: Completion method KafkaSourceTask#pause(), KafkaSourceTask#resume(). (#854)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 59ac9fd7b7fd11ad7943b5ef30a3f95b20acadd7
Author: 彭小漪 <64...@qq.com>
AuthorDate: Thu Dec 2 16:07:30 2021 +0800

    [rocketmq-connect-kafka]: Completion method KafkaSourceTask#pause(), KafkaSourceTask#resume(). (#854)
    
    [rocketmq-connect-kafka]: Normalized code style
---
 .../connect/kafka/connector/KafkaSourceConnector.java        |  2 +-
 .../rocketmq/connect/kafka/connector/KafkaSourceTask.java    | 12 +++++++-----
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
index 567a8e9..680df6e 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -42,7 +42,7 @@ public class KafkaSourceConnector extends SourceConnector {
 
         log.info("KafkaSourceConnector verifyAndSetConfig enter");
         for (String key : config.keySet()) {
-            log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
+            log.info("connector verifyAndSetConfig: key: {}, value: {}", key, config.getString(key));
         }
 
         for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index 6122b0e..f077ac0 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -132,11 +132,13 @@ public class KafkaSourceTask extends SourceTask {
     @Override
     public void pause() {
         log.info("source task pause ...");
+        consumer.pause(currentTPList);
     }
 
     @Override
     public void resume() {
         log.info("source task resume ...");
+        consumer.resume(currentTPList);
     }
 
     public String toString() {
@@ -179,7 +181,7 @@ public class KafkaSourceTask extends SourceTask {
         log.info("commitOffset {} topic partition {}", KafkaSourceTask.this, tpList);
         List<ByteBuffer> topic_partition_list = new ArrayList<>();
         for (TopicPartition tp : tpList) {
-            topic_partition_list.add(ByteBuffer.wrap((tp.topic()+"-"+tp.partition()).getBytes()));
+            topic_partition_list.add(ByteBuffer.wrap((tp.topic() + "-" + tp.partition()).getBytes()));
         }
 
         Map<TopicPartition, OffsetAndMetadata> commitOffsets = new HashMap<>();
@@ -198,7 +200,7 @@ public class KafkaSourceTask extends SourceTask {
         }
 
         commitOffsets.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) ->
-                log.info("commitOffset {}, TopicPartition:{} offset:{}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
+                log.info("commitOffset {}, TopicPartition: {} offset: {}", KafkaSourceTask.this, entry.getKey(), entry.getValue()));
         if (!commitOffsets.isEmpty()) {
             if (isClose) {
                 consumer.commitSync(commitOffsets);
@@ -213,9 +215,9 @@ public class KafkaSourceTask extends SourceTask {
         @Override
         public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
             if (e != null) {
-                log.warn("commit async excepiton {}", e);
+                log.warn("commit async excepiton", e);
                 map.entrySet().stream().forEach((Map.Entry<TopicPartition, OffsetAndMetadata> entry) -> {
-                    log.warn("commit exception, TopicPartition:{} offset: {}", entry.getKey().toString(), entry.getValue().offset());
+                    log.warn("commit exception, TopicPartition: {} offset: {}", entry.getKey().toString(), entry.getValue().offset());
                 });
                 return;
             }
@@ -229,9 +231,9 @@ public class KafkaSourceTask extends SourceTask {
 
             currentTPList.clear();
             for (TopicPartition tp : partitions) {
+                log.info("onPartitionsAssigned TopicPartition {}", tp);
                 currentTPList.add(tp);
             }
-            currentTPList.stream().forEach((TopicPartition tp)-> log.info("onPartitionsAssigned  TopicPartition {}", tp));
         }
 
         @Override

[rocketmq-connect] 04/07: [ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 598de5dbacd6e3c4c643f07bc64e0903447fd7b3
Author: jonnxu <jo...@163.com>
AuthorDate: Fri Jul 26 21:30:56 2019 +0800

    [ISSUE #341] Add wakeup before kafka consumer close to wakeup consumer poll (#342)
---
 .../org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java     | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index d4b39e0..1f7ed00 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -122,6 +122,7 @@ public class KafkaSourceTask extends SourceTask {
         log.info("source task stop enter");
         try {
             commitOffset(currentTPList, true);
+            consumer.wakeup(); // wakeup poll in other thread
             consumer.close();
         } catch (Exception e) {
             log.warn("{} consumer {} close exception {}", this, consumer, e);

[rocketmq-connect] 01/07: Init rocketmq-kafka connect project

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d0ff0aa8e20cab6312a9e5b55e3025bfd927f949
Author: duheng.dh <du...@alibaba-inc.com>
AuthorDate: Tue Jun 11 14:22:26 2019 +0800

    Init rocketmq-kafka connect project
---
 README.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/README.md
@@ -0,0 +1 @@
+

[rocketmq-connect] 03/07: Update README.md for rocketmq-connect-kafka (#338)

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 9aa0640e5c84e7c4088a91ce88e349068806b5c2
Author: jonnxu <jo...@163.com>
AuthorDate: Wed Jul 24 09:27:45 2019 +0800

    Update README.md for rocketmq-connect-kafka (#338)
---
 README.md | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/README.md b/README.md
index 8b13789..d0917cb 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,30 @@
+**rocketmq-connect-kafka**
 
+在启动runtime之后,通过发送http消息到runtime,携带connector和task的参数,启动connector
+
+**参数说明**
+
+- **connector-class**: connector的类名
+- **oms-driver-url**: rocketmq地址
+- **source-record-converter**: source record converter
+- **tasks.num**: 启动的task数目
+- **kafka.topics**: topic列表,多个topic通过逗号“,”隔开
+- **kafka.group.id**: 消费组名,多个connector中,需要保证topic和groupid的一致性
+- **kafka.bootstrap.server**: kafka地址
+
+
+**启动Connector**
+
+http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms: rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"io.openmessaging.connect.runtime.converter.JsonConverter"}
+
+**查看Connector运行状态**
+
+http://127.0.0.1:8081/connectors/connector-name/status
+
+**查看Connector配置**
+
+http://127.0.0.1:8081/connectors/connector-name/config
+
+**关闭Connector**
+
+http://127.0.0.1:8081/connectors/connector-name/stop