You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:44 UTC

[rocketmq-connect] 34/39: [ISSUE #699] [Replicator] The source task starts to check the position (#700)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit a0b10aad77b0cdfe980b7d6abb1dd3d369689447
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Wed Sep 22 14:09:10 2021 +0800

    [ISSUE #699] [Replicator] The source task starts to check the position (#700)
    
    Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
 .../apache/rocketmq/replicator/RmqSourceTask.java  | 83 +++++++++++++++++-----
 1 file changed, 67 insertions(+), 16 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index da7013a..87ed9a8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.replicator;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.PositionStorageReader;
 import io.openmessaging.connector.api.data.DataEntryBuilder;
 import io.openmessaging.connector.api.data.EntryType;
 import io.openmessaging.connector.api.data.Field;
@@ -33,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -57,6 +59,8 @@ public class RmqSourceTask extends SourceTask {
     private final TaskConfig config;
     private final DefaultMQPullConsumer consumer;
     private volatile boolean started = false;
+    private final long TIMEOUT = 1000 * 60 * 10;
+    private final long WAIT_TIME = 1000 * 2;
 
     private Map<TaskTopicInfo, Long> mqOffsetMap;
 
@@ -93,27 +97,19 @@ public class RmqSourceTask extends SourceTask {
             }
 
             this.consumer.start();
+
+            List<TaskTopicInfo> topicListFilter = new ArrayList<>();
             for (TaskTopicInfo tti : topicList) {
                 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getTopic());
                 for (MessageQueue mq : mqs) {
-                    if (tti.getQueueId() == mq.getQueueId()) {
-                        ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
-                            ByteBuffer.wrap(RmqConstants.getPartition(
-                                mq.getTopic(),
-                                mq.getBrokerName(),
-                                String.valueOf(mq.getQueueId())).getBytes(StandardCharsets.UTF_8)));
-
-                        if (null != positionInfo && positionInfo.array().length > 0) {
-                            String positionJson = new String(positionInfo.array(), StandardCharsets.UTF_8);
-                            JSONObject jsonObject = JSONObject.parseObject(positionJson);
-                            this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
-                        } else {
-                            this.config.setNextPosition(0L);
-                        }
-                        mqOffsetMap.put(tti, this.config.getNextPosition());
+                    if (tti.getBrokerName().equals(mq.getBrokerName()) && tti.getQueueId() == mq.getQueueId()) {
+                        topicListFilter.add(tti);
+                        break;
                     }
                 }
             }
+            PositionStorageReader positionStorageReader = this.context.positionStorageReader();
+            mqOffsetMap.putAll(getPositionMapWithCheck(topicListFilter, positionStorageReader, this.TIMEOUT, TimeUnit.MILLISECONDS));
             started = true;
         } catch (Exception e) {
             log.error("Consumer of task {} start failed.", this.taskId, e);
@@ -163,7 +159,7 @@ public class RmqSourceTask extends SourceTask {
                             for (MessageExt msg : msgs) {
                                 DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
                                 dataEntryBuilder.timestamp(System.currentTimeMillis())
-                                        .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
+                                    .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
                                 dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
                                 SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
                                     ByteBuffer.wrap(RmqConstants.getPartition(
@@ -205,5 +201,60 @@ public class RmqSourceTask extends SourceTask {
     private Collection<SourceDataEntry> pollSubConfig() {
         return new ArrayList<>();
     }
+
+    public Map<TaskTopicInfo, Long> getPositionMapWithCheck(List<TaskTopicInfo> taskList,
+        PositionStorageReader positionStorageReader, long timeout, TimeUnit unit) {
+        unit = unit == null ? TimeUnit.MILLISECONDS : unit;
+
+        Map<TaskTopicInfo, Long> positionMap = getPositionMap(taskList, positionStorageReader);
+
+        long msecs = unit.toMillis(timeout);
+        long startTime = msecs <= 0L ? 0L : System.currentTimeMillis();
+        long waitTime;
+        boolean waitPositionReady;
+        do {
+            try {
+                Thread.sleep(this.WAIT_TIME);
+            } catch (InterruptedException e) {
+                log.error("Thread sleep error.", e);
+            }
+
+            Map<TaskTopicInfo, Long> positionMapCmp = getPositionMap(taskList, positionStorageReader);
+            waitPositionReady = true;
+            for (Map.Entry<TaskTopicInfo, Long> positionEntry : positionMap.entrySet()) {
+                if (positionMapCmp.getOrDefault(positionEntry.getKey(), 0L) != positionEntry.getValue().longValue()) {
+                    waitPositionReady = false;
+                    positionMap = positionMapCmp;
+                    break;
+                }
+            }
+
+            waitTime = msecs - (System.currentTimeMillis() - startTime);
+        } while (!waitPositionReady && waitTime > 0L);
+
+        return positionMap;
+    }
+
+    public Map<TaskTopicInfo, Long> getPositionMap(List<TaskTopicInfo> taskList,
+        PositionStorageReader positionStorageReader) {
+        Map<TaskTopicInfo, Long> positionMap = new HashMap<>();
+        for (TaskTopicInfo tti : taskList) {
+            ByteBuffer positionInfo = positionStorageReader.getPosition(
+                ByteBuffer.wrap(RmqConstants.getPartition(
+                    tti.getTopic(),
+                    tti.getBrokerName(),
+                    String.valueOf(tti.getQueueId())).getBytes(StandardCharsets.UTF_8)));
+
+            if (null != positionInfo && positionInfo.array().length > 0) {
+                String positionJson = new String(positionInfo.array(), StandardCharsets.UTF_8);
+                JSONObject jsonObject = JSONObject.parseObject(positionJson);
+                positionMap.put(tti, jsonObject.getLong(RmqConstants.NEXT_POSITION));
+            } else {
+                positionMap.put(tti, 0L);
+            }
+        }
+
+        return positionMap;
+    }
 }