You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/05 12:34:25 UTC
[incubator-seatunnel] branch dev updated: [Bug][engine] fix task can not run with parallelism > 1 (#2984)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 4fb1fb27a [Bug][engine] fix task can not run with parallelism > 1 (#2984)
4fb1fb27a is described below
commit 4fb1fb27a571025bbed30eea7c020fa116701213
Author: liugddx <80...@qq.com>
AuthorDate: Wed Oct 5 20:34:19 2022 +0800
[Bug][engine] fix task can not run with parallelism > 1 (#2984)
---
.../engine/server/execution/TaskLocation.java | 5 +++--
.../server/task/SourceSplitEnumeratorTask.java | 23 ++++++++++++++++------
.../context/SeaTunnelSplitEnumeratorContext.java | 16 +++++++--------
3 files changed, 28 insertions(+), 16 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
index e45b81fab..64737254c 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskLocation.java
@@ -32,6 +32,7 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
private TaskGroupLocation taskGroupLocation;
private long taskID;
+ private int index;
public TaskLocation() {
}
@@ -39,6 +40,7 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
public TaskLocation(TaskGroupLocation taskGroupLocation, long idPrefix, int index) {
this.taskGroupLocation = taskGroupLocation;
this.taskID = mixIDPrefixAndIndex(idPrefix, index);
+ this.index = index;
}
@SuppressWarnings("checkstyle:MagicNumber")
@@ -67,9 +69,8 @@ public class TaskLocation implements IdentifiedDataSerializable, Serializable {
return taskID / 10000;
}
- @SuppressWarnings("checkstyle:MagicNumber")
public int getTaskIndex() {
- return (int) (taskID % 10000);
+ return index;
}
public void setTaskGroupLocation(TaskGroupLocation taskGroupLocation) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index eff32e833..ebc253c6a 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -81,6 +81,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
private Set<Long> unfinishedReaders;
private Map<TaskLocation, Address> taskMemberMapping;
private Map<Long, TaskLocation> taskIDToTaskLocationMapping;
+ private Map<Integer, TaskLocation> taskIndexToTaskLocationMapping;
private volatile SeaTunnelTaskState currState;
@@ -96,6 +97,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
enumeratorStateSerializer = this.source.getSource().getEnumeratorStateSerializer();
taskMemberMapping = new ConcurrentHashMap<>();
taskIDToTaskLocationMapping = new ConcurrentHashMap<>();
+ taskIndexToTaskLocationMapping = new ConcurrentHashMap<>();
maxReaderSize = source.getParallelism();
unfinishedReaders = new CopyOnWriteArraySet<>();
}
@@ -166,7 +168,7 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
public void receivedReader(TaskLocation readerId, Address memberAddr) {
LOGGER.info("received reader register, readerID: " + readerId);
this.addTaskMemberMapping(readerId, memberAddr);
- enumerator.registerReader((int) readerId.getTaskID());
+ enumerator.registerReader(readerId.getTaskIndex());
if (maxReaderSize == taskMemberMapping.size()) {
readerRegisterComplete = true;
}
@@ -176,12 +178,13 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
enumerator.handleSplitRequest((int) taskID);
}
- public void addTaskMemberMapping(TaskLocation taskID, Address memberAddr) {
- taskMemberMapping.put(taskID, memberAddr);
+ public void addTaskMemberMapping(TaskLocation taskID, Address memberAdder) {
+ taskMemberMapping.put(taskID, memberAdder);
taskIDToTaskLocationMapping.put(taskID.getTaskID(), taskID);
+ taskIndexToTaskLocationMapping.put(taskID.getTaskIndex(), taskID);
}
- public Address getTaskMemberAddr(long taskID) {
+ public Address getTaskMemberAddress(long taskID) {
return taskMemberMapping.get(taskIDToTaskLocationMapping.get(taskID));
}
@@ -189,6 +192,14 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
return taskIDToTaskLocationMapping.get(taskID);
}
+ public Address getTaskMemberAddressByIndex(int taskIndex) {
+ return taskMemberMapping.get(taskIndexToTaskLocationMapping.get(taskIndex));
+ }
+
+ public TaskLocation getTaskMemberLocationByIndex(int taskIndex) {
+ return taskIndexToTaskLocationMapping.get(taskIndex);
+ }
+
public void readerFinished(long taskID) {
unfinishedReaders.remove(taskID);
if (unfinishedReaders.isEmpty()) {
@@ -244,8 +255,8 @@ public class SourceSplitEnumeratorTask<SplitT extends SourceSplit> extends Coord
}
}
- public Set<Long> getRegisteredReaders() {
- return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskID).collect(Collectors.toSet());
+ public Set<Integer> getRegisteredReaders() {
+ return taskMemberMapping.keySet().stream().map(TaskLocation::getTaskIndex).collect(Collectors.toSet());
}
private void sendToAllReader(Function<TaskLocation, Operation> function) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index e1fe6c1ff..8bc159174 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -25,9 +25,9 @@ import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> implements SourceSplitEnumerator.Context<SplitT> {
@@ -47,20 +47,20 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit> impleme
@Override
public Set<Integer> registeredReaders() {
- return task.getRegisteredReaders().stream().map(Long::intValue).collect(Collectors.toSet());
+ return new HashSet<>(task.getRegisteredReaders());
}
@Override
- public void assignSplit(int subtaskId, List<SplitT> splits) {
- task.getExecutionContext().sendToMember(new AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId),
- SerializationUtils.serialize(splits.toArray())), task.getTaskMemberAddr(subtaskId));
+ public void assignSplit(int subtaskIndex, List<SplitT> splits) {
+ task.getExecutionContext().sendToMember(new AssignSplitOperation<>(task.getTaskMemberLocationByIndex(subtaskIndex),
+ SerializationUtils.serialize(splits.toArray())), task.getTaskMemberAddressByIndex(subtaskIndex));
}
@Override
- public void signalNoMoreSplits(int subtaskId) {
+ public void signalNoMoreSplits(int subtaskIndex) {
task.getExecutionContext().sendToMember(
- new AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId), SerializationUtils.serialize(Collections.emptyList().toArray())),
- task.getTaskMemberAddr(subtaskId));
+ new AssignSplitOperation<>(task.getTaskMemberLocationByIndex(subtaskIndex), SerializationUtils.serialize(Collections.emptyList().toArray())),
+ task.getTaskMemberAddressByIndex(subtaskIndex));
}
@Override