You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:31 UTC

[rocketmq-connect] 21/39: [ISSUES #434] Replicator support RocketMQConverter (#463)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit b3377a61e65b81e88824e657f4ddb1f75fb2187f
Author: zhoubo <87...@qq.com>
AuthorDate: Tue Nov 19 23:21:05 2019 +0800

    [ISSUES #434] Replicator support RocketMQConverter (#463)
    
    *   Replicator support RocketMQConverter
    * https://github.com/apache/rocketmq-externals/issues/434
    
    *  Replicator converter byte[] data to string
---
 .../apache/rocketmq/replicator/RmqSourceTask.java  | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index d965898..b504e85 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -163,17 +163,19 @@ public class RmqSourceTask extends SourceTask {
                             DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
                             dataEntryBuilder.timestamp(System.currentTimeMillis())
                                 .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
-                            dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
-
-                            SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                                ByteBuffer.wrap(RmqConstants.getPartition(
-                                    taskTopicConfig.getTopic(),
-                                    taskTopicConfig.getBrokerName(),
-                                    String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
-                                ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
-                            );
-                            sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
-                            res.add(sourceDataEntry);
+                            for (MessageExt msg : msgs) {
+                                dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
+                                SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
+                                    ByteBuffer.wrap(RmqConstants.getPartition(
+                                        taskTopicConfig.getTopic(),
+                                        taskTopicConfig.getBrokerName(),
+                                        String.valueOf(taskTopicConfig.getQueueId())).getBytes(StandardCharsets.UTF_8)),
+                                    ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))
+                                );
+                                sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
+                                res.add(sourceDataEntry);
+                            }
+
                             break;
                         }
                         default: