You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/04/25 05:58:14 UTC

[rocketmq-connect] branch master updated: [ISSUE #90] Fix WorkerDirectTask put position may partially store information lost (#94)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new ec75630  [ISSUE #90] Fix WorkerDirectTask put position may partially store information lost (#94)
ec75630 is described below

commit ec756308750cdbedc432dd9a7d28f4c56539251c
Author: Slideee <64...@qq.com>
AuthorDate: Mon Apr 25 13:58:10 2022 +0800

    [ISSUE #90] Fix WorkerDirectTask put position may partially store information lost (#94)
    
    Co-authored-by: yechun <ye...@corp.netease.com>
---
 .../runtime/connectorwrapper/WorkerDirectTask.java       | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index fab036a..d6f3ed7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.collections.MapUtils;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -127,19 +128,20 @@ public class WorkerDirectTask implements WorkerTask {
 
     private void sendRecord(Collection<ConnectRecord> sourceDataEntries) {
         List<ConnectRecord> sinkDataEntries = new ArrayList<>(sourceDataEntries.size());
-        RecordPartition partition = null;
-        RecordOffset offset = null;
+        Map<RecordPartition, RecordOffset> map = new HashMap<>();
         for (ConnectRecord sourceDataEntry : sourceDataEntries) {
             sinkDataEntries.add(sourceDataEntry);
-            partition = sourceDataEntry.getPosition().getPartition();
-            offset = sourceDataEntry.getPosition().getOffset();
+            RecordPartition recordPartition = sourceDataEntry.getPosition().getPartition();
+            RecordOffset recordOffset = sourceDataEntry.getPosition().getOffset();
+            if (null != recordPartition && null != recordOffset) {
+                map.put(recordPartition, recordOffset);
+            }
         }
-
         try {
             sinkTask.put(sinkDataEntries);
             try {
-                if (null != partition && null != offset) {
-                    positionManagementService.putPosition(partition, offset);
+                if (!MapUtils.isEmpty(map)) {
+                    map.forEach(positionManagementService::putPosition);
                 }
             } catch (Exception e) {
                 log.error("Source task save position info failed.", e);