You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/05 12:34:25 UTC

[incubator-seatunnel] branch dev updated: [Bug][engine] fix task can not run with parallelism > 1 (#2984)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4fb1fb27a [Bug][engine] fix task can not run with parallelism > 1 (#2984)
4fb1fb27a is described below

commit 4fb1fb27a571025bbed30eea7c020fa116701213
Author: liugddx <80...@qq.com>
AuthorDate: Wed Oct 5 20:34:19 2022 +0800

    [Bug][engine] fix task can not run with parallelism > 1 (#2984)
---
 .../engine/server/execution/TaskLocation.java      |  5 +++--
 .../server/task/SourceSplitEnumeratorTask.java     | 23 ++++++++++++++++------
 .../context/SeaTunnelSplitEnumeratorContext.java   | 16 +++++++--------
 3 files changed, 28 insertions(+), 16 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index e45b81fab..64737254c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -32,6 +32,7 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
 
     private TaskGroupLocation taskGroupLocation;
     private long taskID;
+    private int index;
 
     public TaskLocation() {
     }
@@ -39,6 +40,7 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
     public TaskLocation(TaskGroupLocation taskGroupLocation, long idPrefix, int index) {
         this.taskGroupLocation = taskGroupLocation;
         this.taskID = mixIDPrefixAndIndex(idPrefix, index);
+        this.index = index;
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -67,9 +69,8 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
         return taskID / 10000;
     }
 
-    @SuppressWarnings("checkstyle:MagicNumber")
     public int getTaskIndex() {
-        return (int) (taskID % 10000);
+        return index;
     }
 
     public void setTaskGroupLocation(TaskGroupLocation taskGroupLocation) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index eff32e833..ebc253c6a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -81,6 +81,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     private Set<Long> unfinishedReaders;
     private Map<TaskLocation, Address> taskMemberMapping;
     private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
+    private Map<Integer, TaskLocation> taskIndexToTaskLocationMapping;
 
     private volatile SeaTunnelTaskState currState;
 
@@ -96,6 +97,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         enumeratorStateSerializer = this.source.getSource().getEnumeratorStateSerializer();
         taskMemberMapping = new ConcurrentHashMap<>();
         taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
+        taskIndexToTaskLocationMapping = new ConcurrentHashMap<>();
         maxReaderSize = source.getParallelism();
         unfinishedReaders = new CopyOnWriteArraySet<>();
     }
@@ -166,7 +168,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
     public void receivedReader(TaskLocation readerId, Address memberAddr) {
         LOGGER.info("received reader register, readerID: " + readerId);
         this.addTaskMemberMapping(readerId, memberAddr);
-        enumerator.registerReader((int) readerId.getTaskID());
+        enumerator.registerReader(readerId.getTaskIndex());
         if (maxReaderSize == taskMemberMapping.size()) {
             readerRegisterComplete = true;
         }
@@ -176,12 +178,13 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         enumerator.handleSplitRequest((int) taskID);
     }
 
-    public void addTaskMemberMapping(TaskLocation taskID, Address memberAddr) {
-        taskMemberMapping.put(taskID, memberAddr);
+    public void addTaskMemberMapping(TaskLocation taskID, Address memberAdder) {
+        taskMemberMapping.put(taskID, memberAdder);
         taskIDToTaskLocationMapping.put(taskID.getTaskID(), taskID);
+        taskIndexToTaskLocationMapping.put(taskID.getTaskIndex(), taskID);
     }
 
-    public Address getTaskMemberAddr(long taskID) {
+    public Address getTaskMemberAddress(long taskID) {
         return taskMemberMapping.get(taskIDToTaskLocationMapping.get(taskID));
     }
 
@@ -189,6 +192,14 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         return taskIDToTaskLocationMapping.get(taskID);
     }
 
+    public Address getTaskMemberAddressByIndex(int taskIndex) {
+        return taskMemberMapping.get(taskIndexToTaskLocationMapping.get(taskIndex));
+    }
+
+    public TaskLocation getTaskMemberLocationByIndex(int taskIndex) {
+        return taskIndexToTaskLocationMapping.get(taskIndex);
+    }
+
     public void readerFinished(long taskID) {
         unfinishedReaders.remove(taskID);
         if (unfinishedReaders.isEmpty()) {
@@ -244,8 +255,8 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
         }
     }
 
-    public Set<Long> getRegisteredReaders() {
-        return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskID).collect(Collectors.toSet());
+    public Set<Integer> getRegisteredReaders() {
+        return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toSet());
     }
 
     private void sendToAllReader(Function<TaskLocation, Operation> function) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index e1fe6c1ff..8bc159174 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -25,9 +25,9 @@ import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
 import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
 
@@ -47,20 +47,20 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> impleme
 
     @Override
     public Set<Integer> registeredReaders() {
-        return task.getRegisteredReaders().stream().map(Long::intValue).collect(Collectors.toSet());
+        return new HashSet<>(task.getRegisteredReaders());
     }
 
     @Override
-    public void assignSplit(int subtaskId, List<SplitT> splits) {
-        task.getExecutionContext().sendToMember(new AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId),
-            SerializationUtils.serialize(splits.toArray())), task.getTaskMemberAddr(subtaskId));
+    public void assignSplit(int subtaskIndex, List<SplitT> splits) {
+        task.getExecutionContext().sendToMember(new AssignSplitOperation<>(task.getTaskMemberLocationByIndex(subtaskIndex),
+            SerializationUtils.serialize(splits.toArray())), task.getTaskMemberAddressByIndex(subtaskIndex));
     }
 
     @Override
-    public void signalNoMoreSplits(int subtaskId) {
+    public void signalNoMoreSplits(int subtaskIndex) {
         task.getExecutionContext().sendToMember(
-            new AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId), SerializationUtils.serialize(Collections.emptyList().toArray())),
-            task.getTaskMemberAddr(subtaskId));
+            new AssignSplitOperation<>(task.getTaskMemberLocationByIndex(subtaskIndex), SerializationUtils.serialize(Collections.emptyList().toArray())),
+            task.getTaskMemberAddressByIndex(subtaskIndex));
     }
 
     @Override