You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:22 UTC

[rocketmq-connect] 12/39: extend messageQueue for TaskTopicInfo

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit ad38ec76cc737aef32fc5c5e8533af6f19fc72b4
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Tue Sep 17 14:24:31 2019 +0800

    extend messageQueue for TaskTopicInfo
---
 .../rocketmq/replicator/RmqSourceReplicator.java   |  2 +-
 .../apache/rocketmq/replicator/RmqSourceTask.java  | 59 +++++++---------------
 .../rocketmq/replicator/config/TaskTopicInfo.java  | 40 ++-------------
 3 files changed, 23 insertions(+), 78 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index f24cf63..e124f15 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -228,7 +228,7 @@ public class RmqSourceReplicator extends SourceConnector {
                             for (QueueData qd : topicRouteData.getQueueDatas()) {
                                 if (brokerNameSet.contains(qd.getBrokerName())) {
                                     for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                                        TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), String.valueOf(i), targetTopic);
+                                        TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
                                         topicRouteMap.get(topic).add(taskTopicInfo);
                                     }
                                 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index c4d53fa..b16e585 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -47,6 +47,7 @@ public class RmqSourceTask extends SourceTask {
     private volatile boolean started = false;
 
     private Map<TaskTopicInfo, Long> mqOffsetMap;
+
     public RmqSourceTask() {
         this.config = new TaskConfig();
         this.consumer = new DefaultMQPullConsumer();
@@ -76,36 +77,15 @@ public class RmqSourceTask extends SourceTask {
 
         try {
             this.consumer.start();
-            for (TaskTopicInfo tti: topicList) {
-                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getSourceTopic());
-                if (!tti.getQueueId().equals("")) {
-                    // divide task by queue
-                    for (MessageQueue mq: mqs) {
-                        if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
-                            ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
-                                    ByteBuffer.wrap(RmqConstants.getPartition(
-                                            mq.getTopic(),
-                                            mq.getBrokerName(),
-                                            String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
-
-                            if (null != positionInfo && positionInfo.array().length > 0) {
-                                String positionJson = new String(positionInfo.array(), "UTF-8");
-                                JSONObject jsonObject = JSONObject.parseObject(positionJson);
-                                this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
-                            } else {
-                                this.config.setNextPosition(0L);
-                            }
-                            mqOffsetMap.put(tti, this.config.getNextPosition());
-                        }
-                    }
-                } else {
-                    // divide task by topic
-                    for (MessageQueue mq: mqs) {
+            for (TaskTopicInfo tti : topicList) {
+                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getTopic());
+                for (MessageQueue mq : mqs) {
+                    if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
                         ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
-                                ByteBuffer.wrap(RmqConstants.getPartition(
-                                        mq.getTopic(),
-                                        mq.getBrokerName(),
-                                        String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
+                            ByteBuffer.wrap(RmqConstants.getPartition(
+                                mq.getTopic(),
+                                mq.getBrokerName(),
+                                String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
 
                         if (null != positionInfo && positionInfo.array().length > 0) {
                             String positionJson = new String(positionInfo.array(), "UTF-8");
@@ -149,9 +129,8 @@ public class RmqSourceTask extends SourceTask {
         if (started) {
             try {
                 for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) {
-                    MessageQueue mq = taskTopicConfig.convertMQ();
-                    PullResult pullResult = consumer.pull(mq, "*",
-                            this.mqOffsetMap.get(mq), 32);
+                    PullResult pullResult = consumer.pull(taskTopicConfig, "*",
+                        this.mqOffsetMap.get(taskTopicConfig), 32);
                     switch (pullResult.getPullStatus()) {
                         case FOUND: {
                             this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
@@ -160,21 +139,21 @@ public class RmqSourceTask extends SourceTask {
                             List<MessageExt> msgs = pullResult.getMsgFoundList();
                             Schema schema = new Schema();
                             schema.setDataSource(this.config.getSourceRocketmq());
-                            schema.setName(mq.getTopic());
+                            schema.setName(taskTopicConfig.getTopic());
                             schema.setFields(new ArrayList<Field>());
                             schema.getFields().add(new Field(0,
-                                    FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
+                                FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
 
                             DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
                             dataEntryBuilder.timestamp(System.currentTimeMillis())
-                                    .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
+                                .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
                             dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
                             SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                                    ByteBuffer.wrap(RmqConstants.getPartition(
-                                            mq.getTopic(),
-                                            mq.getBrokerName(),
-                                            String.valueOf(mq.getQueueId())).getBytes("UTF-8")),
-                                    ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
+                                ByteBuffer.wrap(RmqConstants.getPartition(
+                                    taskTopicConfig.getTopic(),
+                                    taskTopicConfig.getBrokerName(),
+                                    String.valueOf(taskTopicConfig.getQueueId())).getBytes("UTF-8")),
+                                ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
                             );
                             sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
                             res.add(sourceDataEntry);
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index c5a39e4..1086295 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -18,44 +18,15 @@ package org.apache.rocketmq.replicator.config;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 
-public class TaskTopicInfo {
+public class TaskTopicInfo extends MessageQueue{
 
-    private String sourceTopic;
-    private String brokerName;
-    private String queueId;
     private String targetTopic;
 
-    public TaskTopicInfo(String sourceTopic, String brokerName, String queueId, String targetTopic) {
-        this.sourceTopic = sourceTopic;
-        this.brokerName = brokerName;
-        this.queueId = queueId;
+    public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+        super(sourceTopic, brokerName, queueId);
         this.targetTopic = targetTopic;
     }
 
-    public String getSourceTopic() {
-        return sourceTopic;
-    }
-
-    public void setSourceTopic(String sourceTopic) {
-        this.sourceTopic = sourceTopic;
-    }
-
-    public String getBrokerName() {
-        return brokerName;
-    }
-
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
-    }
-
-    public String getQueueId() {
-        return queueId;
-    }
-
-    public void setQueueId(String queueId) {
-        this.queueId = queueId;
-    }
-
     public String getTargetTopic() {
         return this.targetTopic;
     }
@@ -63,9 +34,4 @@ public class TaskTopicInfo {
     public void setTargetTopic(String targetTopic) {
         this.targetTopic = targetTopic;
     }
-
-    public MessageQueue convertMQ() {
-        return new MessageQueue(sourceTopic,
-           brokerName, Integer.parseInt(queueId));
-    }
 }