You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:29 UTC
[rocketmq-connect] 19/39: feat(replicator):add consumer offset sync
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit a3217626bb4c35b5fc71e4a5b704601355a48c06
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Thu Oct 3 23:57:38 2019 +0800
feat(replicator):add consumer offset sync
- sync src cluster consumer groups offset to message store
- Add MetaSourceConnector as sourceConnector for task assign
- Add MetaSourceTask as SourceTask for offset read and message sink
- Add java 1.8 check
Closes #427
---
README.md | 12 ++
pom.xml | 7 +-
.../apache/rocketmq/replicator/MetaSourceTask.java | 163 +++++++++++++++++
.../apache/rocketmq/replicator/RmqConstants.java | 4 +
.../rocketmq/replicator/RmqMetaReplicator.java | 196 +++++++++++++++++++++
.../rocketmq/replicator/RmqSourceReplicator.java | 79 +++------
.../apache/rocketmq/replicator/RmqSourceTask.java | 40 +++--
.../apache/rocketmq/replicator/common/Utils.java | 50 +++++-
.../rocketmq/replicator/config/ConfigDefine.java | 4 +
.../rocketmq/replicator/config/DataType.java | 3 +-
.../replicator/config/RmqConnectorConfig.java | 130 ++++++++++++++
.../rocketmq/replicator/config/TaskConfig.java | 29 +++
.../rocketmq/replicator/config/TaskConfigEnum.java | 3 +
.../replicator/config/TaskDivideConfig.java | 5 +-
.../rocketmq/replicator/offset/OffsetSync.java | 65 +++++++
.../replicator/offset/OffsetSyncStore.java | 90 ++++++++++
.../rocketmq/replicator/schema/FieldName.java | 3 +-
.../replicator/RmqSourceReplicatorTest.java | 7 +-
18 files changed, 809 insertions(+), 81 deletions(-)
diff --git a/README.md b/README.md
index 332db6e..538568b 100644
--- a/README.md
+++ b/README.md
@@ -16,6 +16,18 @@ http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop
````
+## rocketmq-meta-connector启动
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
+?config={"connector-class":"org.apache.rocketmq.replicator.RmqMetaReplicator","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","offset.sync.topic":"syncTopic","taskDivideStrategy":"0","white-list":"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+````
+
+
+## rocketmq-rocketmq-connector停止
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop
+````
+
## rocketmq-replicator参数说明
parameter | type | must | description | sample value
diff --git a/pom.xml b/pom.xml
index 51a6177..dee5b08 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,8 +30,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>6</source>
- <target>6</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
</plugins>
@@ -107,7 +107,7 @@
<id>release-all</id>
<build>
<plugins>
- <plugin>
+ <plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
@@ -128,6 +128,7 @@
<finalName>rocketmq-replicator-${project.version}</finalName>
</build>
</profile>
+
</profiles>
</project>
diff --git a/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
new file mode 100644
index 0000000..16bf464
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.replicator;
+
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
+import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.replicator.common.ConstDefine;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigUtil;
+import org.apache.rocketmq.replicator.config.TaskConfig;
+import org.apache.rocketmq.replicator.offset.OffsetSyncStore;
+import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetaSourceTask extends SourceTask {
+
+ private static final Logger log = LoggerFactory.getLogger(RmqSourceTask.class);
+
+ private final String taskId;
+ private final TaskConfig config;
+ private DefaultMQAdminExt srcMQAdminExt;
+ private volatile boolean started = false;
+
+ private OffsetSyncStore store;
+
+ public MetaSourceTask() {
+ this.config = new TaskConfig();
+ this.taskId = Utils.createTaskId(Thread.currentThread().getName());
+ }
+
+ @Override public void start(KeyValue config) {
+ ConfigUtil.load(config, this.config);
+
+ startAdmin();
+
+ this.store = new OffsetSyncStore(this.srcMQAdminExt, this.config);
+ this.started = true;
+ }
+
+ @Override public void stop() {
+ if (started) {
+ started = false;
+ }
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+
+ @Override public Collection<SourceDataEntry> poll() {
+ log.debug("polling...");
+ List<String> groups = JSONObject.parseArray(this.config.getTaskGroupList(), String.class);
+
+ if (groups == null) {
+ log.info("no group in task.");
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ return Collections.emptyList();
+ }
+ List<SourceDataEntry> res = new ArrayList<>();
+ for (String group : groups) {
+ ConsumeStats stats;
+ try {
+ stats = this.srcMQAdminExt.examineConsumeStats(group);
+ } catch (Exception e) {
+ log.error("admin get consumer info failed for consumer groups: " + group, e);
+ continue;
+ }
+
+ for (Map.Entry<MessageQueue, OffsetWrapper> offsetTable : stats.getOffsetTable().entrySet()) {
+
+ MessageQueue mq = offsetTable.getKey();
+ long srcOffset = offsetTable.getValue().getConsumerOffset();
+ long targetOffset = this.store.convertTargetOffset(mq, srcOffset);
+
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(RmqConstants.NEXT_POSITION, srcOffset);
+
+ Schema schema = new Schema();
+ schema.setDataSource(this.config.getSourceRocketmq());
+ schema.setName(mq.getTopic());
+ schema.setFields(new ArrayList<>());
+ schema.getFields().add(new Field(0,
+ FieldName.OFFSET.getKey(), FieldType.INT64));
+
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(this.config.getStoreTopic())
+ .entryType(EntryType.UPDATE);
+ dataEntryBuilder.putFiled(FieldName.OFFSET.getKey(), targetOffset);
+
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ mq.getTopic(),
+ mq.getBrokerName(),
+ String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
+ );
+ String targetTopic = new StringBuilder().append(group).append("-").append(mq.getTopic())
+ .append("-").append(mq.getQueueId()).toString();
+ sourceDataEntry.setQueueName(targetTopic);
+ res.add(sourceDataEntry);
+ }
+ }
+ return res;
+ }
+
+ private void startAdmin() {
+ this.srcMQAdminExt = new DefaultMQAdminExt();
+ this.srcMQAdminExt.setNamesrvAddr(this.config.getSourceRocketmq());
+ this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.config.getSourceRocketmq()));
+ try {
+ this.srcMQAdminExt.start();
+ } catch (MQClientException e) {
+ log.error("start src mq admin failed.", e);
+ throw new IllegalStateException("start src mq admin failed");
+ }
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
index 4994abe..290ab1c 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
@@ -31,4 +31,8 @@ public class RmqConstants {
public static String getPartition(String topic, String broker, String queueId) {
return new StringBuilder().append(broker).append(topic).append(queueId).toString();
}
+
+ public static String getOffsetTag(String topic, String broker, String queueId, String group) {
+ return new StringBuilder().append(broker).append(topic).append(queueId).append(group).toString();
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
new file mode 100644
index 0000000..38e5af2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.Task;
+import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.replicator.common.ConstDefine;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RmqMetaReplicator extends SourceConnector {
+ private static final Logger log = LoggerFactory.getLogger(RmqSourceReplicator.class);
+
+ private static final Set<String> INNER_CONSUMER_GROUPS = new HashSet<>();
+
+ private RmqConnectorConfig replicatorConfig;
+
+ private volatile boolean configValid = false;
+ private Set<String> knownGroups;
+ private DefaultMQAdminExt srcMQAdminExt;
+ private volatile boolean adminStarted;
+ private ScheduledExecutorService executor;
+
+ static {
+ INNER_CONSUMER_GROUPS.add("TOOLS_CONSUMER");
+ INNER_CONSUMER_GROUPS.add("FILTERSRV_CONSUMER");
+ INNER_CONSUMER_GROUPS.add("__MONITOR_CONSUMER");
+ INNER_CONSUMER_GROUPS.add("CLIENT_INNER_PRODUCER");
+ INNER_CONSUMER_GROUPS.add("SELF_TEST_P_GROUP");
+ INNER_CONSUMER_GROUPS.add("SELF_TEST_C_GROUP");
+ INNER_CONSUMER_GROUPS.add("SELF_TEST_TOPIC");
+ INNER_CONSUMER_GROUPS.add("OFFSET_MOVED_EVENT");
+ INNER_CONSUMER_GROUPS.add("CID_ONS-HTTP-PROXY");
+ INNER_CONSUMER_GROUPS.add("CID_ONSAPI_PERMISSION");
+ INNER_CONSUMER_GROUPS.add("CID_ONSAPI_OWNER");
+ INNER_CONSUMER_GROUPS.add("CID_ONSAPI_PULL");
+ }
+
+ public RmqMetaReplicator() {
+ replicatorConfig = new RmqConnectorConfig();
+ knownGroups = new HashSet<>();
+ executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqMetaReplicator-SourceWatcher-%d").daemon(true).build());
+ }
+
+ @Override public String verifyAndSetConfig(KeyValue config) {
+ log.info("verifyAndSetConfig...");
+ try {
+ replicatorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
+ }
+
+ this.configValid = true;
+ return "";
+ }
+
+ @Override public void start() {
+ log.info("starting...");
+ startMQAdminTools();
+ executor.scheduleAtFixedRate(() ->
+ {
+ try {
+ refreshConsuemrGroups();
+ } catch (Exception e) {
+ log.error("refresh consumer groups failed.", e);
+ }
+ }, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
+ }
+
+ @Override public void stop() {
+ log.info("stopping...");
+ this.executor.shutdown();
+ }
+
+ @Override public void pause() {
+
+ }
+
+ @Override public void resume() {
+
+ }
+
+ @Override public Class<? extends Task> taskClass() {
+ return MetaSourceTask.class;
+ }
+
+ @Override public List<KeyValue> taskConfigs() {
+ log.debug("preparing taskConfig...");
+ if (!configValid) {
+ return new ArrayList<>();
+ }
+
+ startMQAdminTools();
+
+ try {
+ this.knownGroups = this.fetchConsumerGroups();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return Utils.groupPartitions(new ArrayList<>(this.knownGroups), this.replicatorConfig.getTaskParallelism(), replicatorConfig);
+ }
+
+ private synchronized void startMQAdminTools() {
+ if (!configValid || adminStarted) {
+ return;
+ }
+ RPCHook rpcHook = null;
+ this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getSrcNamesrvs());
+ this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getSrcNamesrvs()));
+
+ try {
+ this.srcMQAdminExt.start();
+ log.info("RocketMQ srcMQAdminExt started");
+ } catch (MQClientException e) {
+ log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
+ }
+ adminStarted = true;
+ }
+
+ private void refreshConsuemrGroups() throws InterruptedException, RemotingConnectException, MQBrokerException, RemotingTimeoutException, MQClientException, RemotingSendRequestException {
+ log.debug("refreshConsuemrGroups...");
+ Set<String> groups = fetchConsumerGroups();
+ Set<String> newGroups = new HashSet<>();
+ Set<String> deadGroups = new HashSet<>();
+ newGroups.addAll(groups);
+ newGroups.removeAll(knownGroups);
+ deadGroups.addAll(knownGroups);
+ deadGroups.removeAll(groups);
+ if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
+ log.info("reconfig consumer groups, new Groups: {} , dead groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
+ knownGroups = groups;
+ context.requestTaskReconfiguration();
+ }
+ }
+
+ private Set<String> fetchConsumerGroups() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ return listGroups().stream().filter(this::skipInnerGroup).collect(Collectors.toSet());
+ }
+
+ private Set<String> listGroups() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+ Set<String> groups = new HashSet<>();
+ ClusterInfo clusterInfo = this.srcMQAdminExt.examineBrokerClusterInfo();
+ String[] addrs = clusterInfo.retrieveAllAddrByCluster(this.replicatorConfig.getSrcCluster());
+ for (String addr : addrs) {
+ ConsumeStatsList stats = this.srcMQAdminExt.fetchConsumeStatsInBroker(addr, true, 3 * 1000);
+ stats.getConsumeStatsList().stream().map(kv -> kv.keySet()).forEach(groups::addAll);
+ }
+ return groups;
+ }
+
+ private boolean skipInnerGroup(String group) {
+ if (INNER_CONSUMER_GROUPS.contains(group) || group.startsWith("CID_RMQ_SYS_") || group.startsWith("PositionManage") ||
+ group.startsWith("ConfigManage") || group.startsWith("OffsetManage")) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 429102e..c560840 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -47,12 +46,9 @@ import org.apache.rocketmq.replicator.common.ConstDefine;
import org.apache.rocketmq.replicator.common.Utils;
import org.apache.rocketmq.replicator.config.ConfigDefine;
import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import org.apache.rocketmq.replicator.config.TaskTopicInfo;
-import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
-import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
-import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
-import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,18 +61,12 @@ public class RmqSourceReplicator extends SourceConnector {
private boolean syncRETRY = false;
- private KeyValue replicatorConfig;
+ private RmqConnectorConfig replicatorConfig;
private Map<String, List<TaskTopicInfo>> topicRouteMap;
- private TaskDivideStrategy taskDivideStrategy;
-
- private Set<String> whiteList;
-
private volatile boolean configValid = false;
- private int taskParallelism = 1;
-
private DefaultMQAdminExt srcMQAdminExt;
private DefaultMQAdminExt targetMQAdminExt;
@@ -86,7 +76,7 @@ public class RmqSourceReplicator extends SourceConnector {
public RmqSourceReplicator() {
topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
- whiteList = new HashSet<String>();
+ replicatorConfig = new RmqConnectorConfig();
executor = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("RmqSourceReplicator-SourceWatcher-%d").daemon(true).build());
}
@@ -96,14 +86,14 @@ public class RmqSourceReplicator extends SourceConnector {
}
RPCHook rpcHook = null;
this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
- this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+ this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getSrcNamesrvs());
this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
- this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+ this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getSrcNamesrvs()));
this.targetMQAdminExt = new DefaultMQAdminExt(rpcHook);
- this.targetMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ));
+ this.targetMQAdminExt.setNamesrvAddr(this.replicatorConfig.getTargetNamesrvs());
this.targetMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
- this.targetMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ)));
+ this.targetMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getTargetNamesrvs()));
try {
this.srcMQAdminExt.start();
@@ -128,29 +118,11 @@ public class RmqSourceReplicator extends SourceConnector {
}
}
- // Check the whitelist, whitelist is required.
- String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST);
- String[] wl = whileListStr.trim().split(",");
- if (wl.length <= 0)
- return "White list must be not empty.";
- else {
- for (String t : wl) {
- this.whiteList.add(t.trim());
- }
- }
-
- if (config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
- config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
- this.taskDivideStrategy = new DivideTaskByQueue();
- } else {
- this.taskDivideStrategy = new DivideTaskByTopic();
- }
-
- if (config.containsKey(ConfigDefine.CONN_TASK_PARALLELISM)) {
- this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
+ try {
+ this.replicatorConfig.validate(config);
+ } catch (IllegalArgumentException e) {
+ return e.getMessage();
}
-
- this.replicatorConfig = config;
this.configValid = true;
return "";
}
@@ -173,7 +145,7 @@ public class RmqSourceReplicator extends SourceConnector {
context.requestTaskReconfiguration();
}
}
- }, 30, 30, TimeUnit.SECONDS);
+ }, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
}
public boolean compare(Map<String, List<TaskTopicInfo>> origin, Map<String, List<TaskTopicInfo>> updated) {
@@ -225,21 +197,22 @@ public class RmqSourceReplicator extends SourceConnector {
buildRoute();
TaskDivideConfig tdc = new TaskDivideConfig(
- this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
- this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
- this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+ this.replicatorConfig.getSrcNamesrvs(),
+ this.replicatorConfig.getSrcCluster(),
+ this.replicatorConfig.getStoreTopic(),
+ this.replicatorConfig.getConverter(),
DataType.COMMON_MESSAGE.ordinal(),
- this.taskParallelism
+ this.replicatorConfig.getTaskParallelism()
);
- return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+ return this.replicatorConfig.getTaskDivideStrategy().divide(this.topicRouteMap, tdc);
}
public void buildRoute() {
List<Pattern> patterns = new ArrayList<Pattern>();
- String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
+ String srcCluster = this.replicatorConfig.getSrcCluster();
try {
Set<String> targetTopicSet = fetchTargetTopics();
- for (String topic : this.whiteList) {
+ for (String topic : this.replicatorConfig.getWhiteList()) {
Pattern pattern = Pattern.compile(topic);
patterns.add(pattern);
}
@@ -290,16 +263,12 @@ public class RmqSourceReplicator extends SourceConnector {
}
}
- public void setWhiteList(Set<String> whiteList) {
- this.whiteList = whiteList;
- }
-
public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
return this.topicRouteMap;
}
public Set<String> fetchTargetTopics() throws RemotingException, MQClientException, InterruptedException {
- String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+ String targetCluster = this.replicatorConfig.getTargetCluster();
TopicList targetTopics = this.targetMQAdminExt.fetchTopicsByCLuster(targetCluster);
return targetTopics.getTopicList();
}
@@ -317,8 +286,8 @@ public class RmqSourceReplicator extends SourceConnector {
*/
public void ensureTargetTopic(String srcTopic,
String targetTopic) throws RemotingException, MQClientException, InterruptedException {
- String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
- String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+ String srcCluster = this.replicatorConfig.getSrcCluster();
+ String targetCluster = this.replicatorConfig.getTargetCluster();
List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, srcTopic, srcCluster);
if (brokerList.size() == 0) {
@@ -334,7 +303,7 @@ public class RmqSourceReplicator extends SourceConnector {
}
public String generateTargetTopic(String topic) {
- String fmt = this.replicatorConfig.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT);
+ String fmt = this.replicatorConfig.getRenamePattern();
if (StringUtils.isNotEmpty(fmt)) {
Map<String, String> params = new HashMap<String, String>();
params.put("topic", topic);
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 42dbab8..d965898 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -19,8 +19,20 @@ package org.apache.rocketmq.replicator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.data.*;
+import io.openmessaging.connector.api.data.DataEntryBuilder;
+import io.openmessaging.connector.api.data.EntryType;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.connector.api.source.SourceTask;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
@@ -31,11 +43,11 @@ import org.apache.rocketmq.replicator.config.DataType;
import org.apache.rocketmq.replicator.config.TaskConfig;
import org.apache.rocketmq.replicator.config.TaskTopicInfo;
import org.apache.rocketmq.replicator.schema.FieldName;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.*;
public class RmqSourceTask extends SourceTask {
@@ -52,7 +64,7 @@ public class RmqSourceTask extends SourceTask {
this.config = new TaskConfig();
this.consumer = new DefaultMQPullConsumer();
this.taskId = Utils.createTaskId(Thread.currentThread().getName());
- mqOffsetMap = new HashMap<TaskTopicInfo, Long>();
+ mqOffsetMap = new HashMap<>();
}
public Collection<SourceDataEntry> poll() {
@@ -84,15 +96,15 @@ public class RmqSourceTask extends SourceTask {
for (TaskTopicInfo tti : topicList) {
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getTopic());
for (MessageQueue mq : mqs) {
- if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
+ if (tti.getQueueId() == mq.getQueueId()) {
ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
ByteBuffer.wrap(RmqConstants.getPartition(
mq.getTopic(),
mq.getBrokerName(),
- String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
+ String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)));
if (null != positionInfo && positionInfo.array().length > 0) {
- String positionJson = new String(positionInfo.array(), "UTF-8");
+ String positionJson = new String(positionInfo.array(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSONObject.parseObject(positionJson);
this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
} else {
@@ -129,7 +141,7 @@ public class RmqSourceTask extends SourceTask {
private Collection<SourceDataEntry> pollCommonMessage() {
- List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
+ List<SourceDataEntry> res = new ArrayList<>();
if (started) {
try {
for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) {
@@ -144,7 +156,7 @@ public class RmqSourceTask extends SourceTask {
Schema schema = new Schema();
schema.setDataSource(this.config.getSourceRocketmq());
schema.setName(taskTopicConfig.getTopic());
- schema.setFields(new ArrayList<Field>());
+ schema.setFields(new ArrayList<>());
schema.getFields().add(new Field(0,
FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
@@ -152,12 +164,13 @@ public class RmqSourceTask extends SourceTask {
dataEntryBuilder.timestamp(System.currentTimeMillis())
.queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
+
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
ByteBuffer.wrap(RmqConstants.getPartition(
taskTopicConfig.getTopic(),
taskTopicConfig.getBrokerName(),
- String.valueOf(taskTopicConfig.getQueueId())).getBytes("UTF-8")),
- ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
+ String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
);
sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
res.add(sourceDataEntry);
@@ -179,15 +192,16 @@ public class RmqSourceTask extends SourceTask {
}
private Collection<SourceDataEntry> pollTopicConfig() {
- return new ArrayList<SourceDataEntry>();
+ DefaultMQAdminExt srcMQAdminExt;
+ return new ArrayList<>();
}
private Collection<SourceDataEntry> pollBrokerConfig() {
- return new ArrayList<SourceDataEntry>();
+ return new ArrayList<>();
}
private Collection<SourceDataEntry> pollSubConfig() {
- return new ArrayList<SourceDataEntry>();
+ return new ArrayList<>();
}
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index f3e6be6..4134a2a 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.replicator.common;
+import com.alibaba.fastjson.JSONObject;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -26,6 +29,9 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
@@ -48,7 +54,7 @@ public class Utils {
public static String createInstanceName(String namesrvAddr) {
String[] namesrvArray = namesrvAddr.split(";");
- List<String> namesrvList = new ArrayList<String>();
+ List<String> namesrvList = new ArrayList<>();
for (String ns : namesrvArray) {
if (!namesrvList.contains(ns)) {
namesrvList.add(ns);
@@ -60,7 +66,7 @@ public class Utils {
public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
String cluster) throws RemotingException, MQClientException, InterruptedException {
- List<BrokerData> brokerList = new ArrayList<BrokerData>();
+ List<BrokerData> brokerList = new ArrayList<>();
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
if (topicRouteData.getBrokerDatas() != null) {
@@ -79,7 +85,7 @@ public class Utils {
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
- log.info("create topic to %s success.%n", addr);
+ log.info("create topic to {} success.%n", addr);
}
if (topicConfig.isOrder()) {
@@ -94,12 +100,44 @@ public class Utils {
}
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
orderConf.toString(), true);
- log.info("set cluster orderConf=[%s]", orderConf);
+ log.info("set cluster orderConf=[{}]", orderConf);
}
-
- return;
} catch (Exception e) {
throw new IllegalArgumentException("create topic: " + topicConfig + "failed", e);
}
}
+
+ public static List<KeyValue> groupPartitions(List<String> elements, int numGroups, RmqConnectorConfig tdc) {
+ if (numGroups <= 0)
+ throw new IllegalArgumentException("Number of groups must be positive.");
+
+ List<KeyValue> result = new ArrayList<>(numGroups);
+
+ // Each group has either n+1 or n raw partitions
+ int perGroup = elements.size() / numGroups;
+ int leftover = elements.size() - (numGroups * perGroup);
+
+ int assigned = 0;
+ for (int group = 0; group < numGroups; group++) {
+ int numThisGroup = group < leftover ? perGroup + 1 : perGroup;
+ KeyValue keyValue = new DefaultKeyValue();
+ List<String> groupList = new ArrayList<>();
+ for (int i = 0; i < numThisGroup; i++) {
+ groupList.add(elements.get(assigned));
+ assigned++;
+ }
+ keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSrcNamesrvs());
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_CLUSTER.getKey(), tdc.getSrcCluster());
+ keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getSrcCluster());
+ keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.OFFSET.ordinal());
+ keyValue.put(TaskConfigEnum.TASK_GROUP_INFO.getKey(), JSONObject.toJSONString(groupList));
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getConverter());
+ result.add(keyValue);
+
+ log.debug("allocate group partition: {}", keyValue);
+ }
+
+ return result;
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index 5b15821..a3514ee 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -43,6 +43,10 @@ public class ConfigDefine {
public static final String CONN_TOPIC_RENAME_FMT = "topic.rename.format";
+ public static final String REFRESH_INTERVAL = "refresh.interval";
+
+ public static final String OFFSET_SYNC_TOPIC = "offset.sync.topic";
+
/**
* The required key for all configurations.
*/
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/DataType.java b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
index 75f772e..60dc330 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
@@ -21,5 +21,6 @@ public enum DataType {
COMMON_MESSAGE,
TOPIC_CONFIG,
BROKER_CONFIG,
- SUB_CONFIG
+ SUB_CONFIG,
+ OFFSET
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
new file mode 100644
index 0000000..395d75d
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/config/RmqConnectorConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator.config;
+
+import io.openmessaging.KeyValue;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
+import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
+
+public class RmqConnectorConfig {
+
+ private int taskParallelism;
+ private Set<String> whiteList;
+ private String srcNamesrvs;
+ private String targetNamesrvs;
+ private String srcCluster;
+ private String targetCluster;
+ private TaskDivideStrategy taskDivideStrategy;
+ private String storeTopic;
+ private String converter;
+ private long refreshInterval;
+ private String renamePattern;
+ private String offsetSyncTopic;
+
+ public RmqConnectorConfig() {
+ }
+
+ public void validate(KeyValue config) {
+ this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM, 1);
+
+ int strategy = config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY, DivideStrategyEnum.BY_QUEUE.ordinal());
+ if (strategy == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+ this.taskDivideStrategy = new DivideTaskByQueue();
+ } else {
+ this.taskDivideStrategy = new DivideTaskByTopic();
+ }
+
+ buildWhiteList(config);
+
+ srcNamesrvs = config.getString(ConfigDefine.CONN_SOURCE_RMQ);
+ srcCluster = config.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
+ targetNamesrvs = config.getString(ConfigDefine.CONN_TARGET_RMQ);
+ targetCluster = config.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+
+ storeTopic = config.getString(ConfigDefine.CONN_STORE_TOPIC);
+ converter = config.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER);
+ refreshInterval = config.getLong(ConfigDefine.REFRESH_INTERVAL, 3);
+ renamePattern = config.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT);
+ offsetSyncTopic = config.getString(ConfigDefine.OFFSET_SYNC_TOPIC);
+ }
+
+ private void buildWhiteList(KeyValue config) {
+ this.whiteList = new HashSet<String>();
+ String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST, "");
+ String[] wl = whileListStr.trim().split(",");
+ if (wl.length <= 0)
+ throw new IllegalArgumentException("White list must be not empty.");
+ else {
+ this.whiteList.clear();
+ for (String t : wl) {
+ this.whiteList.add(t.trim());
+ }
+ }
+ }
+
+ public int getTaskParallelism() {
+ return this.taskParallelism;
+ }
+
+ public Set<String> getWhiteList() {
+ return this.whiteList;
+ }
+
+ public String getSrcNamesrvs() {
+ return this.srcNamesrvs;
+ }
+
+ public String getTargetNamesrvs() {
+ return this.targetNamesrvs;
+ }
+
+ public String getSrcCluster() {
+ return this.srcCluster;
+ }
+
+ public String getTargetCluster() {
+ return this.targetCluster;
+ }
+
+ public TaskDivideStrategy getTaskDivideStrategy() {
+ return this.taskDivideStrategy;
+ }
+
+ public String getStoreTopic() {
+ return this.storeTopic;
+ }
+
+ public String getConverter() {
+ return this.converter;
+ }
+
+ public long getRefreshInterval() {
+ return this.refreshInterval;
+ }
+
+ public String getRenamePattern() {
+ return this.renamePattern;
+ }
+
+ public String getOffsetSyncTopic() {
+ return this.offsetSyncTopic;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
index e85280f..15676ce 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
@@ -16,14 +16,19 @@
*/
package org.apache.rocketmq.replicator.config;
+import java.util.List;
+
public class TaskConfig {
+ private String sourceCluster;
private String storeTopic;
private String sourceGroup;
private String sourceRocketmq;
private Integer dataType;
private Long nextPosition;
private String taskTopicList;
+ private String taskGroupList;
+ private String offsetSyncTopic;
public String getSourceGroup() {
return sourceGroup;
@@ -73,7 +78,31 @@ public class TaskConfig {
return taskTopicList;
}
+ public void setTaskGroupList(String taskGroupList) {
+ this.taskGroupList = taskGroupList;
+ }
+
+ public String getTaskGroupList() {
+ return this.taskGroupList;
+ }
+
public void setTaskTopicList(String taskTopicList) {
this.taskTopicList = taskTopicList;
}
+
+ public void setSourceCluster(String sourceCluster) {
+ this.sourceCluster = sourceCluster;
+ }
+
+ public String getSourceCluster() {
+ return this.sourceCluster;
+ }
+
+ public void setOffsetSyncTopic(String offsetSyncTopic) {
+ this.offsetSyncTopic = offsetSyncTopic;
+ }
+
+ public String getOffsetSyncTopic() {
+ return offsetSyncTopic;
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
index 1516f7a..01c6787 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
@@ -21,6 +21,8 @@ public enum TaskConfigEnum {
TASK_ID("taskId"),
TASK_SOURCE_GROUP("sourceGroup"),
TASK_SOURCE_ROCKETMQ("sourceRocketmq"),
+ TASK_SOURCE_CLUSTER("sourceCluster"),
+ TASK_OFFSET_SYNC_TOPIC("offsetSyncTopic"),
TASK_SOURCE_TOPIC("sourceTopic"),
TASK_STORE_ROCKETMQ("storeTopic"),
TASK_DATA_TYPE("dataType"),
@@ -28,6 +30,7 @@ public enum TaskConfigEnum {
TASK_QUEUE_ID("queueId"),
TASK_NEXT_POSITION("nextPosition"),
TASK_TOPIC_INFO("taskTopicList"),
+ TASK_GROUP_INFO("taskGroupList"),
TASK_SOURCE_RECORD_CONVERTER("source-record-converter");
private String key;
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
index e6a8144..07d95c8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
@@ -20,6 +20,8 @@ public class TaskDivideConfig {
private String sourceNamesrvAddr;
+ private String srcCluster;
+
private String storeTopic;
private String srcRecordConverter;
@@ -28,9 +30,10 @@ public class TaskDivideConfig {
private int taskParallelism;
- public TaskDivideConfig(String sourceNamesrvAddr, String storeTopic, String srcRecordConverter,
+ public TaskDivideConfig(String sourceNamesrvAddr, String srcCluster, String storeTopic, String srcRecordConverter,
int dataType, int taskParallelism) {
this.sourceNamesrvAddr = sourceNamesrvAddr;
+ this.srcCluster = srcCluster;
this.storeTopic = storeTopic;
this.srcRecordConverter = srcRecordConverter;
this.dataType = dataType;
diff --git a/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSync.java b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSync.java
new file mode 100644
index 0000000..4678d5b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSync.java
@@ -0,0 +1,65 @@
+package org.apache.rocketmq.replicator.offset;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class OffsetSync {
+
+ private MessageQueue mq;
+ private long srcOffset;
+ private long targtOffset;
+
+ public OffsetSync(MessageQueue mq, long srcOffset, long targtOffset) {
+ this.mq = mq;
+ this.srcOffset = srcOffset;
+ this.targtOffset = targtOffset;
+ }
+
+ public void setMq(MessageQueue mq) {
+ this.mq = mq;
+ }
+
+ public void setSrcOffset(long srcOffset) {
+ this.srcOffset = srcOffset;
+ }
+
+ public void setTargtOffset(long targtOffset) {
+ this.targtOffset = targtOffset;
+ }
+
+ public long getSrcOffset() {
+ return this.srcOffset;
+ }
+
+ public long getTargtOffset() {
+ return this.targtOffset;
+ }
+
+ public MessageQueue getMq() {
+ return this.mq;
+ }
+
+ public byte[] encode() {
+ return JSON.toJSONBytes(this);
+ }
+
+ public static OffsetSync decode(byte[] body) {
+ OffsetSync sync = JSON.parseObject(body, OffsetSync.class);
+ return sync;
+ }
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSyncStore.java b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSyncStore.java
new file mode 100644
index 0000000..5703f39
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/replicator/offset/OffsetSyncStore.java
@@ -0,0 +1,90 @@
+package org.apache.rocketmq.replicator.offset;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.TaskConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+public class OffsetSyncStore {
+
+ private DefaultMQAdminExt adminExt;
+ private TaskConfig taskConfig;
+
+ private DefaultMQPullConsumer consumer;
+ private Map<MessageQueue, OffsetSync> syncs;
+ private long lastOffset;
+
+ public OffsetSyncStore(DefaultMQAdminExt adminExt,
+ TaskConfig taskConfig) {
+ this.adminExt = adminExt;
+ this.taskConfig = taskConfig;
+ this.syncs = new HashMap<MessageQueue, OffsetSync>();
+ this.consumer = new DefaultMQPullConsumer();
+ }
+
+ public long convertTargetOffset(MessageQueue mq, long srcOffset) {
+ OffsetSync offsetSync = latestOffsetSync(mq);
+ if (offsetSync.getSrcOffset() > srcOffset) {
+ return -1;
+ }
+ long delta = srcOffset - offsetSync.getSrcOffset();
+ return offsetSync.getTargtOffset() + delta;
+ }
+
+ private boolean sync(
+ Duration pullTimeout) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ TopicRouteData route = adminExt.examineTopicRouteInfo(taskConfig.getOffsetSyncTopic());
+ String brokerName = route.getQueueDatas().get(0).getBrokerName();
+ MessageQueue mq = new MessageQueue(taskConfig.getOffsetSyncTopic(), brokerName, 0);
+
+ PullResult pr = consumer.pull(mq, "", lastOffset, 0, pullTimeout.getNano() / Duration.ofMillis(1).getNano());
+ if (pr.getPullStatus() != PullStatus.FOUND) {
+ return false;
+ }
+ handle(pr);
+ return true;
+ }
+
+ private void handle(PullResult result) {
+ for (MessageExt msg : result.getMsgFoundList()) {
+ byte[] body = msg.getBody();
+ OffsetSync sync = OffsetSync.decode(body);
+ syncs.put(sync.getMq(), sync);
+ }
+ }
+
+ private OffsetSync latestOffsetSync(MessageQueue queue) {
+ return syncs.computeIfAbsent(queue, new Function<MessageQueue, OffsetSync>() {
+ @Override public OffsetSync apply(MessageQueue queue) {
+ return new OffsetSync(queue, -1, -1);
+ }
+ });
+ }
+
+}
diff --git a/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
index 913ffca..a8acd08 100644
--- a/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
+++ b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
@@ -17,7 +17,8 @@
package org.apache.rocketmq.replicator.schema;
public enum FieldName {
- COMMON_MESSAGE("MessageExt");
+ COMMON_MESSAGE("MessageExt"),
+ OFFSET("Offset");
private String key;
diff --git a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
index f271f14..795c386 100644
--- a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -51,10 +52,14 @@ public class RmqSourceReplicatorTest {
@Test
public void testGenerateTopic() throws NoSuchFieldException {
RmqSourceReplicator rmqSourceReplicator = Mockito.spy(RmqSourceReplicator.class);
+
+ RmqConnectorConfig config = new RmqConnectorConfig();
KeyValue kv = new DefaultKeyValue();
kv.put(ConfigDefine.CONN_TOPIC_RENAME_FMT, "${topic}.replica");
+ config.validate(kv);
+
Field field = RmqSourceReplicator.class.getDeclaredField("replicatorConfig");
- FieldSetter.setField(rmqSourceReplicator, field, kv);
+ FieldSetter.setField(rmqSourceReplicator, field, config);
String dstTopic = rmqSourceReplicator.generateTargetTopic("dest");
assertThat(dstTopic).isEqualTo("dest.replica");
}