You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:43 UTC
[rocketmq-connect] 33/39: [Replicator] Fix message duplication problem (#692)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 8e66fd6ec768c6e60e5520e363c46d6c5593370e
Author: Git_Yang <30...@users.noreply.github.com>
AuthorDate: Mon Mar 22 14:38:36 2021 +0800
[Replicator] Fix message duplication problem (#692)
Signed-off-by: zhangyang <Gi...@163.com>
---
src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 3e8d78b..da7013a 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -160,10 +160,10 @@ public class RmqSourceTask extends SourceTask {
schema.getFields().add(new Field(0,
FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
- DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
for (MessageExt msg : msgs) {
+ DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
+ dataEntryBuilder.timestamp(System.currentTimeMillis())
+ .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), new String(msg.getBody()));
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
ByteBuffer.wrap(RmqConstants.getPartition(