You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/17 06:34:08 UTC
[incubator-seatunnel] branch dev updated: [hotfix][engine][checkpoint] fix state restoring error (#3114)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c225ab144 [hotfix][engine][checkpoint] fix state restoring error (#3114)
c225ab144 is described below
commit c225ab1447bf4d1d6f8ec4863d81c73e2b4f0610
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Mon Oct 17 14:34:02 2022 +0800
[hotfix][engine][checkpoint] fix state restoring error (#3114)
---
.../server/checkpoint/CheckpointCoordinator.java | 19 +++++++--
.../engine/server/checkpoint/CheckpointPlan.java | 16 ++++++++
.../server/dag/physical/PhysicalPlanGenerator.java | 45 +++++++++++++++-------
.../engine/server/task/SeaTunnelTask.java | 16 ++++++--
.../operation/source/RestoredSplitOperation.java | 6 ++-
5 files changed, 80 insertions(+), 22 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 2a1ef88d7..df0e06122 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
+import static org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
import static org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
@@ -182,10 +183,20 @@ public class CheckpointCoordinator {
List<ActionSubtaskState> states = new ArrayList<>();
if (latestCompletedCheckpoint != null) {
final Integer currentParallelism = pipelineTasks.get(taskLocation.getTaskVertexId());
- final ActionState actionState = latestCompletedCheckpoint.getTaskStates().get(taskLocation.getTaskVertexId());
- for (int i = taskLocation.getTaskIndex(); i < actionState.getParallelism(); i += currentParallelism) {
- states.add(actionState.getSubtaskStates()[i]);
- }
+ plan.getSubtaskActions().get(taskLocation)
+ .forEach(tuple -> {
+ ActionState actionState = latestCompletedCheckpoint.getTaskStates().get(tuple.f0());
+ if (actionState == null) {
+ return;
+ }
+ if (COORDINATOR_INDEX.equals(tuple.f1())) {
+ states.add(actionState.getCoordinatorState());
+ return;
+ }
+ for (int i = tuple.f1(); i < actionState.getParallelism(); i += currentParallelism) {
+ states.add(actionState.getSubtaskStates()[i]);
+ }
+ });
}
checkpointManager.sendOperationToMemberNode(new NotifyTaskRestoreOperation(taskLocation, states));
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index 0dd1c0dc5..2859c96c3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import com.hazelcast.jet.datamodel.Tuple2;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -37,6 +38,8 @@ import java.util.Set;
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class CheckpointPlan {
+ public static final Integer COORDINATOR_INDEX = -1;
+
private final int pipelineId;
/**
@@ -56,11 +59,19 @@ public class CheckpointPlan {
*/
private final Map<Long, Integer> pipelineActions;
+ /**
+ * <br> key: the subtask locations;
+ * <br> value: all actions in this subtask; f0: action id, f1: action index;
+ */
+ private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+
public static final class Builder {
private final Set<TaskLocation> pipelineSubtasks = new HashSet<>();
private final Set<TaskLocation> startingSubtasks = new HashSet<>();
private final Map<Long, Integer> pipelineActions = new HashMap<>();
+ private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions = new HashMap<>();
+
private Builder() {
}
@@ -78,5 +89,10 @@ public class CheckpointPlan {
this.pipelineActions.putAll(pipelineActions);
return this;
}
+
+ public Builder subtaskActions(Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions) {
+ this.subtaskActions.putAll(subtaskActions);
+ return this;
+ }
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 90bf6336f..1e96c4ba5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -111,6 +111,12 @@ public class PhysicalPlanGenerator {
*/
private final Set<TaskLocation> startingTasks;
+ /**
+ * <br> key: the subtask locations;
+ * <br> value: all actions in this subtask; f0: action id, f1: action index;
+ */
+ private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+
private final IMap<Object, Object> runningJobStateIMap;
private final IMap<Object, Object> runningJobStateTimestampsIMap;
@@ -132,6 +138,7 @@ public class PhysicalPlanGenerator {
// the checkpoint of a pipeline
this.pipelineTasks = new HashSet<>();
this.startingTasks = new HashSet<>();
+ this.subtaskActions = new HashMap<>();
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
@@ -147,6 +154,7 @@ public class PhysicalPlanGenerator {
Stream<SubPlan> subPlanStream = pipelines.stream().map(pipeline -> {
this.pipelineTasks.clear();
this.startingTasks.clear();
+ this.subtaskActions.clear();
final int pipelineId = pipeline.getId();
final List<ExecutionEdge> edges = pipeline.getEdges();
@@ -172,6 +180,7 @@ public class PhysicalPlanGenerator {
.pipelineSubtasks(pipelineTasks)
.startingSubtasks(startingTasks)
.pipelineActions(pipeline.getActions())
+ .subtaskActions(subtaskActions)
.build());
return new SubPlan(pipelineId,
totalPipelineNum,
@@ -207,10 +216,10 @@ public class PhysicalPlanGenerator {
.collect(Collectors.toList());
return collect.stream().map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
- .map(s -> {
+ .map(sinkAction -> {
Optional<? extends SinkAggregatedCommitter<?, ?>> sinkAggregatedCommitter;
try {
- sinkAggregatedCommitter = s.getSink().createAggregatedCommitter();
+ sinkAggregatedCommitter = sinkAction.getSink().createAggregatedCommitter();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -222,22 +231,23 @@ public class PhysicalPlanGenerator {
new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
TaskLocation taskLocation = new TaskLocation(taskGroupLocation, taskTypeId, 0);
SinkAggregatedCommitterTask<?, ?> t =
- new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, s,
+ new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, sinkAction,
sinkAggregatedCommitter.get());
- committerTaskIDMap.put(s, taskLocation);
+ committerTaskIDMap.put(sinkAction, taskLocation);
// checkpoint
pipelineTasks.add(taskLocation);
+ subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2(sinkAction.getId(), -1)));
return new PhysicalVertex(atomicInteger.incrementAndGet(),
executorService,
collect.size(),
- new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-AggregatedCommitterTask",
+ new TaskGroupDefaultImpl(taskGroupLocation, sinkAction.getName() + "-AggregatedCommitterTask",
Lists.newArrayList(t)),
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
- s.getJarUrls(),
+ sinkAction.getJarUrls(),
jobImmutableInformation,
initializationTimestamp,
nodeEngine,
@@ -268,7 +278,7 @@ public class PhysicalPlanGenerator {
SeaTunnelTask seaTunnelTask =
new TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i, flow);
// checkpoint
- pipelineTasks.add(taskLocation);
+ fillCheckpointPlan(seaTunnelTask);
t.add(new PhysicalVertex(
i,
executorService,
@@ -295,24 +305,25 @@ public class PhysicalPlanGenerator {
int totalPipelineNum) {
AtomicInteger atomicInteger = new AtomicInteger(-1);
- return sources.stream().map(s -> {
+ return sources.stream().map(sourceAction -> {
long taskGroupID = idGenerator.getNextId();
long taskTypeId = idGenerator.getNextId();
TaskGroupLocation taskGroupLocation =
new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
TaskLocation taskLocation = new TaskLocation(taskGroupLocation, taskTypeId, 0);
SourceSplitEnumeratorTask<?> t =
- new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation, s);
+ new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation, sourceAction);
// checkpoint
pipelineTasks.add(taskLocation);
startingTasks.add(taskLocation);
- enumeratorTaskIDMap.put(s, taskLocation);
+ subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2(sourceAction.getId(), -1)));
+ enumeratorTaskIDMap.put(sourceAction, taskLocation);
return new PhysicalVertex(
atomicInteger.incrementAndGet(),
executorService,
sources.size(),
- new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-SplitEnumerator",
+ new TaskGroupDefaultImpl(taskGroupLocation, sourceAction.getName() + "-SplitEnumerator",
Lists.newArrayList(t)),
flakeIdGenerator,
pipelineIndex,
@@ -352,8 +363,6 @@ public class PhysicalPlanGenerator {
flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
final TaskLocation taskLocation =
new TaskLocation(taskGroupLocation, taskIDPrefix, finalParallelismIndex);
- // checkpoint
- pipelineTasks.add(taskLocation);
if (f instanceof PhysicalExecutionFlow) {
return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
taskLocation,
@@ -363,7 +372,7 @@ public class PhysicalPlanGenerator {
taskLocation,
finalParallelismIndex, f);
}
- }).collect(Collectors.toList());
+ }).peek(this::fillCheckpointPlan).collect(Collectors.toList());
Set<URL> jars =
taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
@@ -408,6 +417,14 @@ public class PhysicalPlanGenerator {
}).collect(Collectors.toList());
}
+ private void fillCheckpointPlan(SeaTunnelTask task) {
+ pipelineTasks.add(task.getTaskLocation());
+ subtaskActions.put(task.getTaskLocation(),
+ task.getActionIds().stream()
+ .map(id -> Tuple2.tuple2(id, task.getTaskLocation().getTaskIndex()))
+ .collect(Collectors.toSet()));
+ }
+
/**
* set config for flow, some flow should have config support for execute on task.
*
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 5379e1d60..437ecd74b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.engine.common.utils.ConsumerWithException;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -75,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public abstract class SeaTunnelTask extends AbstractTask {
@@ -231,21 +233,29 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public Set<URL> getJarsUrl() {
+ return getFlowInfo((action, set) -> set.addAll(action.getJarUrls()));
+ }
+
+ public Set<Long> getActionIds() {
+ return getFlowInfo((action, set) -> set.add(action.getId()));
+ }
+
+ private <T> Set<T> getFlowInfo(BiConsumer<Action, Set<T>> function) {
List<Flow> now = new ArrayList<>();
now.add(executionFlow);
- Set<URL> urls = new HashSet<>();
+ Set<T> result = new HashSet<>();
while (!now.isEmpty()) {
final List<Flow> next = new ArrayList<>();
now.forEach(n -> {
if (n instanceof PhysicalExecutionFlow) {
- urls.addAll(((PhysicalExecutionFlow) n).getAction().getJarUrls());
+ function.accept(((PhysicalExecutionFlow) n).getAction(), result);
}
next.addAll(n.getNext());
});
now.clear();
now.addAll(next);
}
- return urls;
+ return result;
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 30df27151..4da0c2896 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -34,6 +34,7 @@ import com.hazelcast.nio.ObjectDataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
public class RestoredSplitOperation extends TaskOperation {
@@ -78,7 +79,10 @@ public class RestoredSplitOperation extends TaskOperation {
SeaTunnelServer server = getService();
TaskExecutionService taskExecutionService = server.getTaskExecutionService();
ClassLoader classLoader = taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
- List<SourceSplit> deserialize = Arrays.asList(SerializationUtils.deserialize(splits, classLoader));
+
+ List<SourceSplit> deserialize = Arrays.stream((Object[]) SerializationUtils.deserialize(splits, classLoader))
+ .map(o -> (SourceSplit) o)
+ .collect(Collectors.toList());
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<SourceSplit> task = taskExecutionService.getTask(taskLocation);
task.addSplitsBack(deserialize, subtaskIndex);