You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:31 UTC
[rocketmq-connect] 21/39: [ISSUES #434] Replicator support RocketMQConverter (#463)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit b3377a61e65b81e88824e657f4ddb1f75fb2187f
Author: zhoubo <87...@qq.com>
AuthorDate: Tue Nov 19 23:21:05 2019 +0800
[ISSUES #434] Replicator support RocketMQConverter (#463)
* Replicator support RocketMQConverter
* https://github.com/apache/rocketmq-externals/issues/434
* Replicator converter byte[] data to string
---
.../apache/rocketmq/replicator/RmqSourceTask.java | 24 ++++++++++++----------
1 file changed, 13 insertions(+), 11 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index d965898..b504e85 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -163,17 +163,19 @@ public class RmqSourceTask extends SourceTask {
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
.queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
- dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
-
- SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(RmqConstants.getPartition(
- taskTopicConfig.getTopic(),
- taskTopicConfig.getBrokerName(),
- String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
- ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
- );
- sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
- res.add(sourceDataEntry);
+ for (MessageExt msg : msgs) {
+ dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
+ SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ taskTopicConfig.getTopic(),
+ taskTopicConfig.getBrokerName(),
+ String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
+ ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
+ );
+ sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
+ res.add(sourceDataEntry);
+ }
+
break;
}
default: