You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:21:41 UTC

[rocketmq-connect] 05/07: [ISSUE #420]remove openmessage-runtime dependency

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit e74c022c593a99b5c79d8e8a774254e7ef44f01a
Author: sanchen <sa...@chenyiliu.com>
AuthorDate: Sun Oct 6 16:42:42 2019 +0800

    [ISSUE #420]remove openmessage-runtime dependency
---
 README.md                                          |  2 +-
 pom.xml                                            |  5 --
 .../connect/kafka/config/ConfigDefine.java         | 67 ++++++++++++++++++++++
 .../kafka/{Config.java => config/ConfigUtil.java}  | 58 ++-----------------
 .../kafka/connector/KafkaSourceConnector.java      | 35 +++++------
 .../connect/kafka/connector/KafkaSourceTask.java   |  8 +--
 src/main/resources/connect-kafka-source.properties |  1 -
 .../kafka/connector/KafkaSourceConnectorTest.java  | 15 ++---
 8 files changed, 104 insertions(+), 87 deletions(-)

diff --git a/README.md b/README.md
index d0917cb..213c8fa 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
 
 **启动Connector**
 
-http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms: rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"io.openmessaging.connect.runtime.converter.JsonConverter"}
+http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
 
 **查看Connector运行状态**
 
diff --git a/pom.xml b/pom.xml
index 08712ca..ccc4cc1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,11 +158,6 @@
         -->
         <dependency>
             <groupId>io.openmessaging</groupId>
-            <artifactId>openmessaging-connect-runtime</artifactId>
-            <version>0.0.1-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>io.openmessaging</groupId>
             <artifactId>openmessaging-connector</artifactId>
             <version>0.1.0-beta</version>
         </dependency>
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
new file mode 100644
index 0000000..9a7f1ba
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.config;
+
+import java.util.*;
+
+public class ConfigDefine {
+
+    public static String TASK_NUM = "tasks.num";
+    public static String TOPICS = "kafka.topics";
+    public static String GROUP_ID = "kafka.group.id";
+    public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
+    public static String CONNECTOR_CLASS = "connector-class";
+    public static String SOURCE_RECORD_CONVERTER = "source-record-converter";
+    public static String ROCKETMQ_TOPIC = "rocketmq.topic";
+
+    private String bootstrapServers;
+    private String topics;
+    private String groupId;
+
+    public String getTopics() {
+        return topics;
+    }
+
+    public void setTopics(String topics) {
+        this.topics = topics;
+    }
+
+    public String getBootstrapServers() {
+        return bootstrapServers;
+    }
+
+    public void setBootstrapServers(String bootstrapServers) {
+        this.bootstrapServers = bootstrapServers;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+        {
+            add(TOPICS);
+            add(GROUP_ID);
+            add(BOOTSTRAP_SERVER);
+        }
+    };
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
similarity index 66%
rename from src/main/java/org/apache/rocketmq/connect/kafka/Config.java
rename to src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
index 869597e..0587dae 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
@@ -14,62 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.rocketmq.connect.kafka;
+package org.apache.rocketmq.connect.kafka.config;
 
 import io.openmessaging.KeyValue;
-import java.lang.reflect.Method;
-import java.util.*;
-
-public class Config {
-
-    public static String TASK_NUM = "tasks.num";
-    public static String TOPICS = "kafka.topics";
-    public static String GROUP_ID = "kafka.group.id";
-    public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
-    public static String ROCKETMQ_TOPIC = "rocketmq.topic";
-
-    private String bootstrapServers;
-    private String topics;
-    private String groupId;
 
-    public String getTopics() {
-        return topics;
-    }
-
-    public void setTopics(String topics) {
-        this.topics = topics;
-    }
-
-    public String getBootstrapServers() {
-        return bootstrapServers;
-    }
+import java.lang.reflect.Method;
 
-    public void setBootstrapServers(String bootstrapServers) {
-        this.bootstrapServers = bootstrapServers;
-    }
+public class ConfigUtil {
 
-    public String getGroupId() {
-        return groupId;
-    }
+    public static <T> void load(KeyValue props, Object object) {
 
-    public void setGroupId(String groupId) {
-        this.groupId = groupId;
+        properties2Object(props, object);
     }
 
-    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
-        {
-            add(TOPICS);
-            add(GROUP_ID);
-            add(BOOTSTRAP_SERVER);
-        }
-    };
-
-    public void load(KeyValue props) {
-        properties2Object(props, this);
-    }
-
-    private void properties2Object(final KeyValue p, final Object object) {
+    private static <T> void properties2Object(final KeyValue p, final Object object) {
 
         Method[] methods = object.getClass().getMethods();
         for (Method method : methods) {
@@ -109,8 +67,4 @@ public class Config {
             }
         }
     }
-
-    public static Set<String> getRequestConfig() {
-        return REQUEST_CONFIG;
-    }
 }
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
index ba30901..567a8e9 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -18,18 +18,17 @@
 package org.apache.rocketmq.connect.kafka.connector;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connect.runtime.common.ConnectKeyValue;
-import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
-import org.apache.rocketmq.connect.kafka.Config;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
-public class KafkaSourceConnector extends SourceConnector{
+public class KafkaSourceConnector extends SourceConnector {
     private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class);
 
     private KeyValue connectConfig;
@@ -42,12 +41,12 @@ public class KafkaSourceConnector extends SourceConnector{
     public String verifyAndSetConfig(KeyValue config) {
 
         log.info("KafkaSourceConnector verifyAndSetConfig enter");
-        for ( String key : config.keySet()) {
+        for (String key : config.keySet()) {
             log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
         }
 
-        for(String requestKey : Config.REQUEST_CONFIG){
-            if(!config.containsKey(requestKey)){
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
                 return "Request Config key: " + requestKey;
             }
         }
@@ -82,20 +81,22 @@ public class KafkaSourceConnector extends SourceConnector{
 
     @Override
     public List<KeyValue> taskConfigs() {
+        if (connectConfig == null) {
+            return new ArrayList<KeyValue>();
+        }
 
         log.info("Source Connector taskConfigs enter");
         List<KeyValue> configs = new ArrayList<>();
-        int task_num = connectConfig.getInt(Config.TASK_NUM);
+        int task_num = connectConfig.getInt(ConfigDefine.TASK_NUM);
         log.info("Source Connector taskConfigs: task_num:" + task_num);
-        for (int i=0; i < task_num; ++i) {
-            KeyValue config = new ConnectKeyValue();
-            config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER));
-            config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS));
-            config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID));
-
-            config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
-            config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
-            config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+        for (int i = 0; i < task_num; ++i) {
+            KeyValue config = new DefaultKeyValue();
+            config.put(ConfigDefine.BOOTSTRAP_SERVER, connectConfig.getString(ConfigDefine.BOOTSTRAP_SERVER));
+            config.put(ConfigDefine.TOPICS, connectConfig.getString(ConfigDefine.TOPICS));
+            config.put(ConfigDefine.GROUP_ID, connectConfig.getString(ConfigDefine.GROUP_ID));
+
+            config.put(ConfigDefine.CONNECTOR_CLASS, connectConfig.getString(ConfigDefine.CONNECTOR_CLASS));
+            config.put(ConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(ConfigDefine.SOURCE_RECORD_CONVERTER));
             configs.add(config);
         }
         return configs;
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index 1f7ed00..6122b0e 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -23,7 +23,7 @@ import io.openmessaging.connector.api.data.*;
 import io.openmessaging.connector.api.source.SourceTask;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.rocketmq.connect.kafka.Config;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,8 +97,8 @@ public class KafkaSourceTask extends SourceTask {
         this.currentTPList = new ArrayList<>();
         this.config = taskConfig;
         Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER));
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID));
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(ConfigDefine.BOOTSTRAP_SERVER));
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(ConfigDefine.GROUP_ID));
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
@@ -106,7 +106,7 @@ public class KafkaSourceTask extends SourceTask {
 
         this.consumer = new KafkaConsumer<>(props);
 
-        String topics = this.config.getString(Config.TOPICS);
+        String topics = this.config.getString(ConfigDefine.TOPICS);
         for (String topic : topics.split(",")) {
             if (!topic.isEmpty()) {
                 topicList.add(topic);
diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties
index f974cb9..5ab36ed 100644
--- a/src/main/resources/connect-kafka-source.properties
+++ b/src/main/resources/connect-kafka-source.properties
@@ -15,7 +15,6 @@
 
 name=rocketmq-connect-kafka
 connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector
-oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default
 source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter
 task.num=2
 kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
index c64b5e7..1e12ca3 100644
--- a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.connect.kafka.connector;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.connect.kafka.Config;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -31,8 +31,8 @@ public class KafkaSourceConnectorTest {
     public void verifyAndSetConfigTest() {
         KeyValue keyValue = new DefaultKeyValue();
 
-        for (String requestKey : Config.REQUEST_CONFIG) {
-            assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+            assertEquals(connector.verifyAndSetConfig(keyValue), "Request Config key: " + requestKey);
             keyValue.put(requestKey, requestKey);
         }
         assertEquals(connector.verifyAndSetConfig(keyValue), "");
@@ -40,17 +40,18 @@ public class KafkaSourceConnectorTest {
 
     @Test
     public void taskClassTest() {
-        assertEquals(connector.taskClass(), KafkaSourceConnector.class);
+        assertEquals(connector.taskClass(), KafkaSourceTask.class);
     }
 
     @Test
     public void taskConfigsTest() {
-        assertEquals(connector.taskConfigs().get(0), null);
+        assertEquals(connector.taskConfigs().size(), 0);
         KeyValue keyValue = new DefaultKeyValue();
-        for (String requestKey : Config.REQUEST_CONFIG) {
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
             keyValue.put(requestKey, requestKey);
         }
+        keyValue.put(ConfigDefine.TASK_NUM,1);
         connector.verifyAndSetConfig(keyValue);
-        assertEquals(connector.taskConfigs().get(0), keyValue);
+        assertEquals(connector.taskConfigs().get(0).getString(ConfigDefine.TOPICS), keyValue.getString(ConfigDefine.TOPICS));
     }
 }