You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:22 UTC
[rocketmq-connect] 12/39: extend messageQueue for TaskTopicInfo
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit ad38ec76cc737aef32fc5c5e8533af6f19fc72b4
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Tue Sep 17 14:24:31 2019 +0800
extend messageQueue for TaskTopicInfo
---
.../rocketmq/replicator/RmqSourceReplicator.java | 2 +-
.../apache/rocketmq/replicator/RmqSourceTask.java | 59 +++++++---------------
.../rocketmq/replicator/config/TaskTopicInfo.java | 40 ++-------------
3 files changed, 23 insertions(+), 78 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index f24cf63..e124f15 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -228,7 +228,7 @@ public class RmqSourceReplicator extends SourceConnector {
for (QueueData qd : topicRouteData.getQueueDatas()) {
if (brokerNameSet.contains(qd.getBrokerName())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
- TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), String.valueOf(i), targetTopic);
+ TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), i, targetTopic);
topicRouteMap.get(topic).add(taskTopicInfo);
}
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index c4d53fa..b16e585 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -47,6 +47,7 @@ public class RmqSourceTask extends SourceTask {
private volatile boolean started = false;
private Map<TaskTopicInfo, Long> mqOffsetMap;
+
public RmqSourceTask() {
this.config = new TaskConfig();
this.consumer = new DefaultMQPullConsumer();
@@ -76,36 +77,15 @@ public class RmqSourceTask extends SourceTask {
try {
this.consumer.start();
- for (TaskTopicInfo tti: topicList) {
- Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getSourceTopic());
- if (!tti.getQueueId().equals("")) {
- // divide task by queue
- for (MessageQueue mq: mqs) {
- if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
- ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
- ByteBuffer.wrap(RmqConstants.getPartition(
- mq.getTopic(),
- mq.getBrokerName(),
- String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
-
- if (null != positionInfo && positionInfo.array().length > 0) {
- String positionJson = new String(positionInfo.array(), "UTF-8");
- JSONObject jsonObject = JSONObject.parseObject(positionJson);
- this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
- } else {
- this.config.setNextPosition(0L);
- }
- mqOffsetMap.put(tti, this.config.getNextPosition());
- }
- }
- } else {
- // divide task by topic
- for (MessageQueue mq: mqs) {
+ for (TaskTopicInfo tti : topicList) {
+ Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getTopic());
+ for (MessageQueue mq : mqs) {
+ if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
- ByteBuffer.wrap(RmqConstants.getPartition(
- mq.getTopic(),
- mq.getBrokerName(),
- String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ mq.getTopic(),
+ mq.getBrokerName(),
+ String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
if (null != positionInfo && positionInfo.array().length > 0) {
String positionJson = new String(positionInfo.array(), "UTF-8");
@@ -149,9 +129,8 @@ public class RmqSourceTask extends SourceTask {
if (started) {
try {
for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) {
- MessageQueue mq = taskTopicConfig.convertMQ();
- PullResult pullResult = consumer.pull(mq, "*",
- this.mqOffsetMap.get(mq), 32);
+ PullResult pullResult = consumer.pull(taskTopicConfig, "*",
+ this.mqOffsetMap.get(taskTopicConfig), 32);
switch (pullResult.getPullStatus()) {
case FOUND: {
this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
@@ -160,21 +139,21 @@ public class RmqSourceTask extends SourceTask {
List<MessageExt> msgs = pullResult.getMsgFoundList();
Schema schema = new Schema();
schema.setDataSource(this.config.getSourceRocketmq());
- schema.setName(mq.getTopic());
+ schema.setName(taskTopicConfig.getTopic());
schema.setFields(new ArrayList<Field>());
schema.getFields().add(new Field(0,
- FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
+ FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
+ .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(RmqConstants.getPartition(
- mq.getTopic(),
- mq.getBrokerName(),
- String.valueOf(mq.getQueueId())).getBytes("UTF-8")),
- ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ taskTopicConfig.getTopic(),
+ taskTopicConfig.getBrokerName(),
+ String.valueOf(taskTopicConfig.getQueueId())).getBytes("UTF-8")),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
);
sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
res.add(sourceDataEntry);
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index c5a39e4..1086295 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -18,44 +18,15 @@ package org.apache.rocketmq.replicator.config;
import org.apache.rocketmq.common.message.MessageQueue;
-public class TaskTopicInfo {
+public class TaskTopicInfo extends MessageQueue{
- private String sourceTopic;
- private String brokerName;
- private String queueId;
private String targetTopic;
- public TaskTopicInfo(String sourceTopic, String brokerName, String queueId, String targetTopic) {
- this.sourceTopic = sourceTopic;
- this.brokerName = brokerName;
- this.queueId = queueId;
+ public TaskTopicInfo(String sourceTopic, String brokerName, int queueId, String targetTopic) {
+ super(sourceTopic, brokerName, queueId);
this.targetTopic = targetTopic;
}
- public String getSourceTopic() {
- return sourceTopic;
- }
-
- public void setSourceTopic(String sourceTopic) {
- this.sourceTopic = sourceTopic;
- }
-
- public String getBrokerName() {
- return brokerName;
- }
-
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
- }
-
- public String getQueueId() {
- return queueId;
- }
-
- public void setQueueId(String queueId) {
- this.queueId = queueId;
- }
-
public String getTargetTopic() {
return this.targetTopic;
}
@@ -63,9 +34,4 @@ public class TaskTopicInfo {
public void setTargetTopic(String targetTopic) {
this.targetTopic = targetTopic;
}
-
- public MessageQueue convertMQ() {
- return new MessageQueue(sourceTopic,
- brokerName, Integer.parseInt(queueId));
- }
}