You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:43 UTC

[rocketmq-connect] 33/39: [Replicator] Fix message duplication problem (#692)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 8e66fd6ec768c6e60e5520e363c46d6c5593370e
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Mon Mar 22 14:38:36 2021 +0800

    [Replicator] Fix message duplication problem (#692)
    
    Signed-off-by: zhangyang <Gi...@163.com>
---
 src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 3e8d78b..da7013a 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -160,10 +160,10 @@ public class RmqSourceTask extends SourceTask {
                             schema.getFields().add(new Field(0,
                                 FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
 
-                            DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
-                            dataEntryBuilder.timestamp(System.currentTimeMillis())
-                                .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
                             for (MessageExt msg : msgs) {
+                                DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+                                dataEntryBuilder.timestamp(System.currentTimeMillis())
+                                        .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
                                 dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
                                 SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
                                     ByteBuffer.wrap(RmqConstants.getPartition(