You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/18 08:53:34 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine][task] Fix coordinator task running error (#3126)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ae6fa9907 [hotfix][engine][task] Fix coordinator task running error (#3126)
ae6fa9907 is described below

commit ae6fa990787c9474b907c7a72be19b56b3b3fbf0
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Tue Oct 18 16:53:26 2022 +0800

    [hotfix][engine][task] Fix coordinator task running error (#3126)
---
 .../engine/server/task/SinkAggregatedCommitterTask.java      | 12 ++++++++++--
 .../engine/server/task/SourceSplitEnumeratorTask.java        |  1 +
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 5d8eef25b..8497c5883 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -45,6 +45,7 @@ import lombok.NonNull;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -213,8 +214,15 @@ public class SinkAggregatedCommitterTask<CommandInfoT, AggregatedCommitInfoT> ex
 
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        aggregatedCommitter.commit(checkpointCommitInfoMap.get(checkpointId));
-        checkpointCommitInfoMap.remove(checkpointId);
+        List<AggregatedCommitInfoT> aggregatedCommitInfo = new ArrayList<>();
+        checkpointCommitInfoMap.forEach((key, value) -> {
+            if (key > checkpointId) {
+                return;
+            }
+            aggregatedCommitInfo.addAll(value);
+            checkpointCommitInfoMap.remove(key);
+        });
+        aggregatedCommitter.commit(aggregatedCommitInfo);
         if (prepareCloseStatus) {
             closeCall();
         }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 83fb06b82..608a0fb1c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -180,6 +180,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         taskMemberMapping.put(taskID, memberAdder);
         taskIDToTaskLocationMapping.put(taskID.getTaskID(), taskID);
         taskIndexToTaskLocationMapping.put(taskID.getTaskIndex(), taskID);
+        unfinishedReaders.add(taskID.getTaskID());
     }
 
     public Address getTaskMemberAddress(long taskID) {