You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/04/25 05:58:14 UTC
[rocketmq-connect] branch master updated: [ISSUE #90] Fix WorkerDirectTask put position may partially store information lost (#94)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new ec75630 [ISSUE #90] Fix WorkerDirectTask put position may partially store information lost (#94)
ec75630 is described below
commit ec756308750cdbedc432dd9a7d28f4c56539251c
Author: Slideee <64...@qq.com>
AuthorDate: Mon Apr 25 13:58:10 2022 +0800
[ISSUE #90] Fix WorkerDirectTask put position may partially store information lost (#94)
Co-authored-by: yechun <ye...@corp.netease.com>
---
.../runtime/connectorwrapper/WorkerDirectTask.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index fab036a..d6f3ed7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.collections.MapUtils;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
@@ -127,19 +128,20 @@ public class WorkerDirectTask implements WorkerTask {
private void sendRecord(Collection<ConnectRecord> sourceDataEntries) {
List<ConnectRecord> sinkDataEntries = new ArrayList<>(sourceDataEntries.size());
- RecordPartition partition = null;
- RecordOffset offset = null;
+ Map<RecordPartition, RecordOffset> map = new HashMap<>();
for (ConnectRecord sourceDataEntry : sourceDataEntries) {
sinkDataEntries.add(sourceDataEntry);
- partition = sourceDataEntry.getPosition().getPartition();
- offset = sourceDataEntry.getPosition().getOffset();
+ RecordPartition recordPartition = sourceDataEntry.getPosition().getPartition();
+ RecordOffset recordOffset = sourceDataEntry.getPosition().getOffset();
+ if (null != recordPartition && null != recordOffset) {
+ map.put(recordPartition, recordOffset);
+ }
}
-
try {
sinkTask.put(sinkDataEntries);
try {
- if (null != partition && null != offset) {
- positionManagementService.putPosition(partition, offset);
+ if (!MapUtils.isEmpty(map)) {
+ map.forEach(positionManagementService::putPosition);
}
} catch (Exception e) {
log.error("Source task save position info failed.", e);