You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:12 UTC
[rocketmq-connect] 02/39: init commit (#309)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 208e4ed922ddb19efb56b3d924f9c870fcc8a322
Author: chuenfaiy <ch...@163.com>
AuthorDate: Fri Jul 12 21:33:26 2019 +0800
init commit (#309)
---
.gitignore | 14 ++
pom.xml | 79 +++++++++
.../apache/rocketmq/connector/RmqConstants.java | 32 ++++
.../rocketmq/connector/RmqSourceConnector.java | 129 +++++++++++++++
.../apache/rocketmq/connector/RmqSourceTask.java | 182 +++++++++++++++++++++
.../apache/rocketmq/connector/common/Utils.java | 28 ++++
.../rocketmq/connector/config/ConfigDefine.java | 48 ++++++
.../rocketmq/connector/config/ConfigUtil.java | 70 ++++++++
.../apache/rocketmq/connector/config/DataType.java | 25 +++
.../rocketmq/connector/config/TaskConfig.java | 88 ++++++++++
.../rocketmq/connector/schema/FieldName.java | 31 ++++
.../connector/strategy/DivideStrategyEnum.java | 23 +++
.../connector/strategy/DivideTaskByQueue.java | 51 ++++++
.../connector/strategy/DivideTaskByTopic.java | 49 ++++++
.../connector/strategy/TaskDivideStrategy.java | 28 ++++
15 files changed, 877 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..36716aa
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,14 @@
+*.iml
+*.class
+*.jar
+*dependency-reduced-pom.xml
+.classpath
+.project
+.settings/
+target/
+devenv
+*.log*
+*.iml
+.idea/
+*.versionsBackup
+.DS_Store
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 0c79eee..13a5af4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,4 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -8,5 +25,67 @@
<artifactId>rocketmq-connector</artifactId>
<version>1.0-SNAPSHOT</version>
+ <properties>
+ <rocketmq.version>4.4.0</rocketmq.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.0-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.5</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jooq</groupId>
+ <artifactId>joor</artifactId>
+ <version>0.9.6</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connect</artifactId>
+ <version>0.1.0-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>0.1.0-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.51</version>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqConstants.java b/src/main/java/org/apache/rocketmq/connector/RmqConstants.java
new file mode 100644
index 0000000..1278424
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/RmqConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector;
+
+public class RmqConstants {
+
+ public static final String BROKER_NAME = "brokerName";
+
+ public static final String TOPIC_NAME = "topic";
+
+ public static final String QUEUE_ID = "queueId";
+
+ public static final String NEXT_POSITION = "nextPosition";
+
+ public static String getPartition(String topic, String broker, String queueId) {
+ return new StringBuilder().append(broker).append(topic).append(queueId).toString();
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
new file mode 100644
index 0000000..c919fa3
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.connector.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.connector.strategy.DivideTaskByTopic;
+import org.apache.rocketmq.connector.strategy.TaskDivideStrategy;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RmqSourceConnector extends SourceConnector {
+
+ private static final Logger log = LoggerFactory.getLogger(RmqSourceConnector.class);
+
+ private KeyValue config;
+
+ private Map<String, List<MessageQueue>> topicRouteMap;
+
+ private final TaskDivideStrategy taskDivideStrategy;
+
+ public RmqSourceConnector() {
+
+ topicRouteMap = new HashMap<String, List<MessageQueue>>();
+
+ if (this.config.getInt(ConfigDefine.TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_TOPIC.ordinal()) {
+ taskDivideStrategy = new DivideTaskByTopic();
+ } else {
+ taskDivideStrategy = new DivideTaskByQueue();
+ }
+ }
+
+ public String verifyAndSetConfig(KeyValue config) {
+
+ for(String requestKey : ConfigDefine.REQUEST_CONFIG){
+ if(!config.containsKey(requestKey)){
+ return "Request config key: " + requestKey;
+ }
+ }
+ this.config = config;
+ return "";
+ }
+
+ public void start() {
+ }
+
+ public void stop() {
+
+ }
+
+ public void pause() {
+
+ }
+
+ public void resume() {
+
+ }
+
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ public List<KeyValue> taskConfigs() {
+
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, this.config.getString(ConfigDefine.SOURCE_RMQ));
+ RPCHook rpcHook = null;
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
+ try {
+ defaultMQAdminExt.start();
+ TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
+ for (String topic: topicList.getTopicList()) {
+ if (!topic.equals(ConfigDefine.STORE_TOPIC)) {
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+ }
+ for (QueueData qd: topicRouteData.getQueueDatas()) {
+ if (PermName.isReadable(qd.getPerm())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ topicRouteMap.get(topic).add(mq);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("fetch topic list error: ", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
+ }
+
+ return this.taskDivideStrategy.divide(this.topicRouteMap,
+ this.config.getString(ConfigDefine.SOURCE_RMQ), this.config.getString(ConfigDefine.STORE_TOPIC));
+ }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
new file mode 100644
index 0000000..d71dc87
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.source.SourceTask;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.common.Utils;
+import org.apache.rocketmq.connector.config.ConfigUtil;
+import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.TaskConfig;
+import org.apache.rocketmq.connector.schema.FieldName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class RmqSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(RmqSourceTask.class);
+
+ private final String taskId;
+ private final TaskConfig config;
+ private final DefaultMQPullConsumer consumer;
+
+ private Map<MessageQueue, Long> mqOffsetMap;
+ public RmqSourceTask() {
+ this.config = new TaskConfig();
+ this.consumer = new DefaultMQPullConsumer();
+ this.taskId = Utils.createTaskId(Thread.currentThread().getName());
+ mqOffsetMap = new HashMap<MessageQueue, Long>();
+ }
+
+ public Collection<SourceDataEntry> poll() {
+
+ if (this.config.getDataType() == DataType.COMMON_MESSAGE.ordinal()) {
+ return pollCommonMessage();
+ } else if (this.config.getDataType() == DataType.TOPIC_CONFIG.ordinal()) {
+ return pollTopicConfig();
+ } else if (this.config.getDataType() == DataType.BROKER_CONFIG.ordinal()) {
+ return pollBrokerConfig();
+ } else {
+ return pollSubConfig();
+ }
+ }
+
+ public void start(KeyValue config) {
+ ConfigUtil.load(config, this.config);
+ this.consumer.setConsumerGroup(Utils.createGroupName(this.config.getSourceTopic()));
+ this.consumer.setNamesrvAddr(this.config.getSourceRocketmq());
+ try {
+ this.consumer.start();
+ Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(this.config.getSourceTopic());
+ if (!this.config.getQueueId().equals("")) {
+ for (MessageQueue mq: mqs) {
+ if (Integer.valueOf(this.config.getQueueId()) == mq.getQueueId()) {
+ ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
+ ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
+ mq.getBrokerName(), String.valueOf(mq.getQueueId()))
+ .getBytes("UTF-8")));
+
+ if (null != positionInfo && positionInfo.array().length > 0) {
+ String positionJson = new String(positionInfo.array(), "UTF-8");
+ JSONObject jsonObject = JSONObject.parseObject(positionJson);
+ this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+ }
+ mqOffsetMap.put(mq, this.config.getNextPosition());
+ }
+ }
+ } else {
+ for (MessageQueue mq: mqs) {
+ ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
+ ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
+ mq.getBrokerName(), String.valueOf(mq.getQueueId()))
+ .getBytes("UTF-8")));
+
+ if (null != positionInfo && positionInfo.array().length > 0) {
+ String positionJson = new String(positionInfo.array(), "UTF-8");
+ JSONObject jsonObject = JSONObject.parseObject(positionJson);
+ this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+ }
+ mqOffsetMap.put(mq, this.config.getNextPosition());
+ }
+ }
+
+ } catch (Exception e) {
+ log.error("consumer of task {} start failed. ", this.taskId, e);
+ }
+ }
+
+ public void stop() {
+ this.consumer.shutdown();
+ }
+
+ public void pause() {
+
+ }
+
+ public void resume() {
+
+ }
+
+ private Collection<SourceDataEntry> pollCommonMessage() {
+
+ List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
+
+ try {
+ for (MessageQueue mq : this.mqOffsetMap.keySet()) {
+ PullResult pullResult = consumer.pull(mq, "*", this.mqOffsetMap.get(mq), 32);
+ switch (pullResult.getPullStatus()) {
+ case FOUND: {
+ this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
+
+ List<MessageExt> msgs = pullResult.getMsgFoundList();
+ for (MessageExt m : msgs) {
+ Schema schema = new Schema();
+ schema.setDataSource(this.config.getSourceRocketmq());
+ schema.setName(this.config.getSourceTopic());
+ schema.setFields(new ArrayList<Field>());
+ schema.getFields().add(new Field(0, FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
+
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
+ dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(m));
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap(RmqConstants.getPartition(this.config.getSourceTopic(),
+ this.config.getBrokerName(),
+ this.config.getQueueId()).getBytes("UTF-8")),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
+ );
+ res.add(sourceDataEntry);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ } catch (Exception e) {
+ log.error("Rocketmq connector task poll error, current config: {}", JSON.toJSONString(config), e);
+ }
+
+ return res;
+ }
+
+ private Collection<SourceDataEntry> pollTopicConfig() {
+ return new ArrayList<SourceDataEntry>();
+ }
+
+ private Collection<SourceDataEntry> pollBrokerConfig() {
+ return new ArrayList<SourceDataEntry>();
+ }
+
+ private Collection<SourceDataEntry> pollSubConfig() {
+ return new ArrayList<SourceDataEntry>();
+ }
+}
+
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
new file mode 100644
index 0000000..71f538c
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.common;
+
+public class Utils {
+
+ public static String createGroupName(String prefix) {
+ return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createTaskId(String prefix) {
+ return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
new file mode 100644
index 0000000..64e9c94
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ConfigDefine {
+
+ public static final String SOURCE_RMQ = "sourceRocketmq";
+
+ public static final String STORE_TOPIC = "storeTopic";
+
+ public static final String TARGET_RMQ = "targetRocketmq";
+
+ public static final String DATA_TYPE = "dataType";
+
+ public static final String QUEUE_ID = "queueId";
+
+ public static final String TASK_DIVIDE_STRATEGY = "taskDivideStrategy";
+
+ public static final String BROKER_NAME = "brokerName";
+
+ public static final String SOURCE_TOPIC = "sourceTopic";
+
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+ {
+ add("sourceRocketmq");
+ add("targetRocketmq");
+ add("storeTopic");
+ add("taskDivideStrategy");
+ }
+ };
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
new file mode 100644
index 0000000..d2dd7da
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+import io.openmessaging.KeyValue;
+
+import java.lang.reflect.Method;
+
+public class ConfigUtil {
+
+ public static <T> void load(KeyValue props, Object object) {
+
+ properties2Object(props, object);
+ }
+
+ private static <T> void properties2Object(final KeyValue p, final Object object) {
+
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getString(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, arg);
+ }
+ }
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/DataType.java b/src/main/java/org/apache/rocketmq/connector/config/DataType.java
new file mode 100644
index 0000000..3e77a3a
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/DataType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+public enum DataType {
+
+ COMMON_MESSAGE,
+ TOPIC_CONFIG,
+ BROKER_CONFIG,
+ SUB_CONFIG
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
new file mode 100644
index 0000000..7bba3fb
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.config;
+
+public class TaskConfig {
+
+ private String storeTopic;
+ private String sourceTopic;
+ private String sourceRocketmq;
+ private Integer dataType;
+ private String brokerName;
+ private String queueId;
+ private Long nextPosition;
+
+ public String getStoreTopic() {
+ return storeTopic;
+ }
+
+ public void setStoreTopic(String storeTopic) {
+ this.storeTopic = storeTopic;
+ }
+
+ public String getSourceTopic() {
+ return sourceTopic;
+ }
+
+ public void setSourceTopic(String sourceTopic) {
+ this.sourceTopic = sourceTopic;
+ }
+
+ public String getSourceRocketmq() {
+ return sourceRocketmq;
+ }
+
+ public void setSourceRocketmq(String sourceRocketmq) {
+ this.sourceRocketmq = sourceRocketmq;
+ }
+
+ public int getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(int dataType) {
+ this.dataType = dataType;
+ }
+
+ public void setDataType(Integer dataType) {
+ this.dataType = dataType;
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public String getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(String queueId) {
+ this.queueId = queueId;
+ }
+
+ public Long getNextPosition() {
+ return nextPosition;
+ }
+
+ public void setNextPosition(Long nextPosition) {
+ this.nextPosition = nextPosition;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java b/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
new file mode 100644
index 0000000..9f4dc47
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.schema;
+
+public enum FieldName {
+ COMMON_MESSAGE("MessageExt");
+
+ private String key;
+
+ FieldName(String key) {
+ this.key = key;
+ }
+
+ public String getKey() {
+ return key;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
new file mode 100644
index 0000000..9dc060f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+public enum DivideStrategyEnum {
+
+ BY_TOPIC,
+ BY_QUEUE
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
new file mode 100644
index 0000000..b6eae8f
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.config.DataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DivideTaskByQueue extends TaskDivideStrategy {
+ public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+
+ List<KeyValue> config = new ArrayList<KeyValue>();
+
+ for (String t: topicRouteMap.keySet()) {
+ for (MessageQueue mq: topicRouteMap.get(t)) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
+ keyValue.put(ConfigDefine.SOURCE_RMQ, source);
+ keyValue.put(ConfigDefine.STORE_TOPIC, t);
+ keyValue.put(ConfigDefine.BROKER_NAME, mq.getBrokerName());
+ keyValue.put(ConfigDefine.QUEUE_ID, String.valueOf(mq.getQueueId()));
+ keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
+ keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
+ config.add(keyValue);
+ }
+ }
+
+ return config;
+
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
new file mode 100644
index 0000000..7c96b28
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.config.DataType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class DivideTaskByTopic extends TaskDivideStrategy {
+
+ public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+
+ List<KeyValue> config = new ArrayList<KeyValue>();
+
+ for (String t: topicRouteMap.keySet()) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
+ keyValue.put(ConfigDefine.SOURCE_RMQ, source);
+ keyValue.put(ConfigDefine.STORE_TOPIC, t);
+ keyValue.put(ConfigDefine.BROKER_NAME, "");
+ keyValue.put(ConfigDefine.QUEUE_ID, "");
+ keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
+ keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
+ config.add(keyValue);
+ }
+
+ return config;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
new file mode 100644
index 0000000..f847cb1
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connector.strategy;
+
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class TaskDivideStrategy {
+
+ public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, String source, String storeTopic);
+}