You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:44 UTC
[rocketmq-connect] 34/39: [ISSUE #699] [Replicator] The source task starts to check the position (#700)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit a0b10aad77b0cdfe980b7d6abb1dd3d369689447
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Wed Sep 22 14:09:10 2021 +0800
[ISSUE #699] [Replicator] The source task starts to check the position (#700)
Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
.../apache/rocketmq/replicator/RmqSourceTask.java | 83 +++++++++++++++++-----
1 file changed, 67 insertions(+), 16 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index da7013a..87ed9a8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.replicator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.PositionStorageReader;
import io.openmessaging.connector.api.data.DataEntryBuilder;
import io.openmessaging.connector.api.data.EntryType;
import io.openmessaging.connector.api.data.Field;
@@ -33,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
@@ -57,6 +59,8 @@ public class RmqSourceTask extends SourceTask {
private final TaskConfig config;
private final DefaultMQPullConsumer consumer;
private volatile boolean started = false;
+ private final long TIMEOUT = 1000 * 60 * 10;
+ private final long WAIT_TIME = 1000 * 2;
private Map<TaskTopicInfo, Long> mqOffsetMap;
@@ -93,27 +97,19 @@ public class RmqSourceTask extends SourceTask {
}
this.consumer.start();
+
+ List<TaskTopicInfo> topicListFilter = new ArrayList<>();
for (TaskTopicInfo tti : topicList) {
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getTopic());
for (MessageQueue mq : mqs) {
- if (tti.getQueueId() == mq.getQueueId()) {
- ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
- ByteBuffer.wrap(RmqConstants.getPartition(
- mq.getTopic(),
- mq.getBrokerName(),
- String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)));
-
- if (null != positionInfo && positionInfo.array().length > 0) {
- String positionJson = new String(positionInfo.array(), StandardCharsets.UTF_8);
- JSONObject jsonObject = JSONObject.parseObject(positionJson);
- this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
- } else {
- this.config.setNextPosition(0L);
- }
- mqOffsetMap.put(tti, this.config.getNextPosition());
+ if (tti.getBrokerName().equals(mq.getBrokerName()) && tti.getQueueId() == mq.getQueueId()) {
+ topicListFilter.add(tti);
+ break;
}
}
}
+ PositionStorageReader positionStorageReader = this.context.positionStorageReader();
+ mqOffsetMap.putAll(getPositionMapWithCheck(topicListFilter, positionStorageReader, this.TIMEOUT, TimeUnit.MILLISECONDS));
started = true;
} catch (Exception e) {
log.error("Consumer of task {} start failed.", this.taskId, e);
@@ -163,7 +159,7 @@ public class RmqSourceTask extends SourceTask {
for (MessageExt msg : msgs) {
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
+ .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
ByteBuffer.wrap(RmqConstants.getPartition(
@@ -205,5 +201,60 @@ public class RmqSourceTask extends SourceTask {
private Collection<SourceDataEntry> pollSubConfig() {
return new ArrayList<>();
}
+
+ public Map<TaskTopicInfo, Long> getPositionMapWithCheck(List<TaskTopicInfo> taskList,
+ PositionStorageReader positionStorageReader, long timeout, TimeUnit unit) {
+ unit = unit == null ? TimeUnit.MILLISECONDS : unit;
+
+ Map<TaskTopicInfo, Long> positionMap = getPositionMap(taskList, positionStorageReader);
+
+ long msecs = unit.toMillis(timeout);
+ long startTime = msecs <= 0L ? 0L : System.currentTimeMillis();
+ long waitTime;
+ boolean waitPositionReady;
+ do {
+ try {
+ Thread.sleep(this.WAIT_TIME);
+ } catch (InterruptedException e) {
+ log.error("Thread sleep error.", e);
+ }
+
+ Map<TaskTopicInfo, Long> positionMapCmp = getPositionMap(taskList, positionStorageReader);
+ waitPositionReady = true;
+ for (Map.Entry<TaskTopicInfo, Long> positionEntry : positionMap.entrySet()) {
+ if (positionMapCmp.getOrDefault(positionEntry.getKey(), 0L) != positionEntry.getValue().longValue()) {
+ waitPositionReady = false;
+ positionMap = positionMapCmp;
+ break;
+ }
+ }
+
+ waitTime = msecs - (System.currentTimeMillis() - startTime);
+ } while (!waitPositionReady && waitTime > 0L);
+
+ return positionMap;
+ }
+
+ public Map<TaskTopicInfo, Long> getPositionMap(List<TaskTopicInfo> taskList,
+ PositionStorageReader positionStorageReader) {
+ Map<TaskTopicInfo, Long> positionMap = new HashMap<>();
+ for (TaskTopicInfo tti : taskList) {
+ ByteBuffer positionInfo = positionStorageReader.getPosition(
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ tti.getTopic(),
+ tti.getBrokerName(),
+ String.valueOf(tti.getQueueId())).getBytes(StandardCharsets.UTF_8)));
+
+ if (null != positionInfo && positionInfo.array().length > 0) {
+ String positionJson = new String(positionInfo.array(), StandardCharsets.UTF_8);
+ JSONObject jsonObject = JSONObject.parseObject(positionJson);
+ positionMap.put(tti, jsonObject.getLong(RmqConstants.NEXT_POSITION));
+ } else {
+ positionMap.put(tti, 0L);
+ }
+ }
+
+ return positionMap;
+ }
}