You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:21:41 UTC
[rocketmq-connect] 05/07: [ISSUE #420]remove openmessage-runtime dependency
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit e74c022c593a99b5c79d8e8a774254e7ef44f01a
Author: sanchen <sa...@chenyiliu.com>
AuthorDate: Sun Oct 6 16:42:42 2019 +0800
[ISSUE #420]remove openmessage-runtime dependency
---
README.md | 2 +-
pom.xml | 5 --
.../connect/kafka/config/ConfigDefine.java | 67 ++++++++++++++++++++++
.../kafka/{Config.java => config/ConfigUtil.java} | 58 ++-----------------
.../kafka/connector/KafkaSourceConnector.java | 35 +++++------
.../connect/kafka/connector/KafkaSourceTask.java | 8 +--
src/main/resources/connect-kafka-source.properties | 1 -
.../kafka/connector/KafkaSourceConnectorTest.java | 15 ++---
8 files changed, 104 insertions(+), 87 deletions(-)
diff --git a/README.md b/README.md
index d0917cb..213c8fa 100644
--- a/README.md
+++ b/README.md
@@ -15,7 +15,7 @@
**启动Connector**
-http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","oms-driver-url":"oms: rocketmq://127.0.0.1:9876/default:default","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"io.openmessaging.connect.runtime.converter.JsonConverter"}
+http://127.0.0.1:8081/connectors/connector-name?config={"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector","tasks.num":"1","kafka.topics":"test1,test2","kafka.group.id":"group0","kafka.bootstrap.server":"127.0.0.1:9092","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
**查看Connector运行状态**
diff --git a/pom.xml b/pom.xml
index 08712ca..ccc4cc1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,11 +158,6 @@
-->
<dependency>
<groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-connect-runtime</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>io.openmessaging</groupId>
<artifactId>openmessaging-connector</artifactId>
<version>0.1.0-beta</version>
</dependency>
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
new file mode 100644
index 0000000..9a7f1ba
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.config;
+
+import java.util.*;
+
+public class ConfigDefine {
+
+ public static String TASK_NUM = "tasks.num";
+ public static String TOPICS = "kafka.topics";
+ public static String GROUP_ID = "kafka.group.id";
+ public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
+ public static String CONNECTOR_CLASS = "connector-class";
+ public static String SOURCE_RECORD_CONVERTER = "source-record-converter";
+ public static String ROCKETMQ_TOPIC = "rocketmq.topic";
+
+ private String bootstrapServers;
+ private String topics;
+ private String groupId;
+
+ public String getTopics() {
+ return topics;
+ }
+
+ public void setTopics(String topics) {
+ this.topics = topics;
+ }
+
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+ {
+ add(TOPICS);
+ add(GROUP_ID);
+ add(BOOTSTRAP_SERVER);
+ }
+ };
+}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
similarity index 66%
rename from src/main/java/org/apache/rocketmq/connect/kafka/Config.java
rename to src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
index 869597e..0587dae 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/Config.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigUtil.java
@@ -14,62 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.rocketmq.connect.kafka;
+package org.apache.rocketmq.connect.kafka.config;
import io.openmessaging.KeyValue;
-import java.lang.reflect.Method;
-import java.util.*;
-
-public class Config {
-
- public static String TASK_NUM = "tasks.num";
- public static String TOPICS = "kafka.topics";
- public static String GROUP_ID = "kafka.group.id";
- public static String BOOTSTRAP_SERVER = "kafka.bootstrap.server";
- public static String ROCKETMQ_TOPIC = "rocketmq.topic";
-
- private String bootstrapServers;
- private String topics;
- private String groupId;
- public String getTopics() {
- return topics;
- }
-
- public void setTopics(String topics) {
- this.topics = topics;
- }
-
- public String getBootstrapServers() {
- return bootstrapServers;
- }
+import java.lang.reflect.Method;
- public void setBootstrapServers(String bootstrapServers) {
- this.bootstrapServers = bootstrapServers;
- }
+public class ConfigUtil {
- public String getGroupId() {
- return groupId;
- }
+ public static <T> void load(KeyValue props, Object object) {
- public void setGroupId(String groupId) {
- this.groupId = groupId;
+ properties2Object(props, object);
}
- public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
- {
- add(TOPICS);
- add(GROUP_ID);
- add(BOOTSTRAP_SERVER);
- }
- };
-
- public void load(KeyValue props) {
- properties2Object(props, this);
- }
-
- private void properties2Object(final KeyValue p, final Object object) {
+ private static <T> void properties2Object(final KeyValue p, final Object object) {
Method[] methods = object.getClass().getMethods();
for (Method method : methods) {
@@ -109,8 +67,4 @@ public class Config {
}
}
}
-
- public static Set<String> getRequestConfig() {
- return REQUEST_CONFIG;
- }
}
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
index ba30901..567a8e9 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnector.java
@@ -18,18 +18,17 @@
package org.apache.rocketmq.connect.kafka.connector;
import io.openmessaging.KeyValue;
-import io.openmessaging.connect.runtime.common.ConnectKeyValue;
-import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
-import org.apache.rocketmq.connect.kafka.Config;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-public class KafkaSourceConnector extends SourceConnector{
+public class KafkaSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(KafkaSourceConnector.class);
private KeyValue connectConfig;
@@ -42,12 +41,12 @@ public class KafkaSourceConnector extends SourceConnector{
public String verifyAndSetConfig(KeyValue config) {
log.info("KafkaSourceConnector verifyAndSetConfig enter");
- for ( String key : config.keySet()) {
+ for (String key : config.keySet()) {
log.info("connector verifyAndSetConfig: key:{}, value:{}", key, config.getString(key));
}
- for(String requestKey : Config.REQUEST_CONFIG){
- if(!config.containsKey(requestKey)){
+ for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
return "Request Config key: " + requestKey;
}
}
@@ -82,20 +81,22 @@ public class KafkaSourceConnector extends SourceConnector{
@Override
public List<KeyValue> taskConfigs() {
+ if (connectConfig == null) {
+ return new ArrayList<KeyValue>();
+ }
log.info("Source Connector taskConfigs enter");
List<KeyValue> configs = new ArrayList<>();
- int task_num = connectConfig.getInt(Config.TASK_NUM);
+ int task_num = connectConfig.getInt(ConfigDefine.TASK_NUM);
log.info("Source Connector taskConfigs: task_num:" + task_num);
- for (int i=0; i < task_num; ++i) {
- KeyValue config = new ConnectKeyValue();
- config.put(Config.BOOTSTRAP_SERVER, connectConfig.getString(Config.BOOTSTRAP_SERVER));
- config.put(Config.TOPICS, connectConfig.getString(Config.TOPICS));
- config.put(Config.GROUP_ID, connectConfig.getString(Config.GROUP_ID));
-
- config.put(RuntimeConfigDefine.CONNECTOR_CLASS, connectConfig.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
- config.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
- config.put(RuntimeConfigDefine.OMS_DRIVER_URL, connectConfig.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+ for (int i = 0; i < task_num; ++i) {
+ KeyValue config = new DefaultKeyValue();
+ config.put(ConfigDefine.BOOTSTRAP_SERVER, connectConfig.getString(ConfigDefine.BOOTSTRAP_SERVER));
+ config.put(ConfigDefine.TOPICS, connectConfig.getString(ConfigDefine.TOPICS));
+ config.put(ConfigDefine.GROUP_ID, connectConfig.getString(ConfigDefine.GROUP_ID));
+
+ config.put(ConfigDefine.CONNECTOR_CLASS, connectConfig.getString(ConfigDefine.CONNECTOR_CLASS));
+ config.put(ConfigDefine.SOURCE_RECORD_CONVERTER, connectConfig.getString(ConfigDefine.SOURCE_RECORD_CONVERTER));
configs.add(config);
}
return configs;
diff --git a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
index 1f7ed00..6122b0e 100644
--- a/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceTask.java
@@ -23,7 +23,7 @@ import io.openmessaging.connector.api.data.*;
import io.openmessaging.connector.api.source.SourceTask;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
-import org.apache.rocketmq.connect.kafka.Config;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,8 +97,8 @@ public class KafkaSourceTask extends SourceTask {
this.currentTPList = new ArrayList<>();
this.config = taskConfig;
Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(Config.BOOTSTRAP_SERVER));
- props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(Config.GROUP_ID));
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.config.getString(ConfigDefine.BOOTSTRAP_SERVER));
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString(ConfigDefine.GROUP_ID));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteBufferDeserializer");
@@ -106,7 +106,7 @@ public class KafkaSourceTask extends SourceTask {
this.consumer = new KafkaConsumer<>(props);
- String topics = this.config.getString(Config.TOPICS);
+ String topics = this.config.getString(ConfigDefine.TOPICS);
for (String topic : topics.split(",")) {
if (!topic.isEmpty()) {
topicList.add(topic);
diff --git a/src/main/resources/connect-kafka-source.properties b/src/main/resources/connect-kafka-source.properties
index f974cb9..5ab36ed 100644
--- a/src/main/resources/connect-kafka-source.properties
+++ b/src/main/resources/connect-kafka-source.properties
@@ -15,7 +15,6 @@
name=rocketmq-connect-kafka
connector-class=org.apache.rocketmq.connect.kafka.connector.KafkaSourceConnector
-oms-driver-url=oms:rocketmq://101.132.96.164:9876/default:default
source-record-converter=io.openmessaging.connect.runtime.converter.JsonConverter
task.num=2
kafka.bootstrap.server=47.112.213.204:9092;47.112.213.204:9092
diff --git a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
index c64b5e7..1e12ca3 100644
--- a/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
+++ b/src/test/java/org/apache/rocketmq/connect/kafka/connector/KafkaSourceConnectorTest.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.connect.kafka.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
-import org.apache.rocketmq.connect.kafka.Config;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -31,8 +31,8 @@ public class KafkaSourceConnectorTest {
public void verifyAndSetConfigTest() {
KeyValue keyValue = new DefaultKeyValue();
- for (String requestKey : Config.REQUEST_CONFIG) {
- assertEquals(connector.verifyAndSetConfig(keyValue), "Request config key: " + requestKey);
+ for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+ assertEquals(connector.verifyAndSetConfig(keyValue), "Request Config key: " + requestKey);
keyValue.put(requestKey, requestKey);
}
assertEquals(connector.verifyAndSetConfig(keyValue), "");
@@ -40,17 +40,18 @@ public class KafkaSourceConnectorTest {
@Test
public void taskClassTest() {
- assertEquals(connector.taskClass(), KafkaSourceConnector.class);
+ assertEquals(connector.taskClass(), KafkaSourceTask.class);
}
@Test
public void taskConfigsTest() {
- assertEquals(connector.taskConfigs().get(0), null);
+ assertEquals(connector.taskConfigs().size(), 0);
KeyValue keyValue = new DefaultKeyValue();
- for (String requestKey : Config.REQUEST_CONFIG) {
+ for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
keyValue.put(requestKey, requestKey);
}
+ keyValue.put(ConfigDefine.TASK_NUM,1);
connector.verifyAndSetConfig(keyValue);
- assertEquals(connector.taskConfigs().get(0), keyValue);
+ assertEquals(connector.taskConfigs().get(0).getString(ConfigDefine.TOPICS), keyValue.getString(ConfigDefine.TOPICS));
}
}