You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/17 06:34:08 UTC

[incubator-seatunnel] branch dev updated: [hotfix][engine][checkpoint] fix state restoring error (#3114)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c225ab144 [hotfix][engine][checkpoint] fix state restoring error (#3114)
c225ab144 is described below

commit c225ab1447bf4d1d6f8ec4863d81c73e2b4f0610
Author: Zongwen Li <zo...@apache.org>
AuthorDate: Mon Oct 17 14:34:02 2022 +0800

    [hotfix][engine][checkpoint] fix state restoring error (#3114)
---
 .../server/checkpoint/CheckpointCoordinator.java   | 19 +++++++--
 .../engine/server/checkpoint/CheckpointPlan.java   | 16 ++++++++
 .../server/dag/physical/PhysicalPlanGenerator.java | 45 +++++++++++++++-------
 .../engine/server/task/SeaTunnelTask.java          | 16 ++++++--
 .../operation/source/RestoredSplitOperation.java   |  6 ++-
 5 files changed, 80 insertions(+), 22 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 2a1ef88d7..df0e06122 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
 
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneakyThrow;
 import static org.apache.seatunnel.engine.core.checkpoint.CheckpointType.COMPLETED_POINT_TYPE;
+import static org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan.COORDINATOR_INDEX;
 import static org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState.READY_START;
 
 import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
@@ -182,10 +183,20 @@ public class CheckpointCoordinator {
         List<ActionSubtaskState> states = new ArrayList<>();
         if (latestCompletedCheckpoint != null) {
             final Integer currentParallelism = pipelineTasks.get(taskLocation.getTaskVertexId());
-            final ActionState actionState = latestCompletedCheckpoint.getTaskStates().get(taskLocation.getTaskVertexId());
-            for (int i = taskLocation.getTaskIndex(); i < actionState.getParallelism(); i += currentParallelism) {
-                states.add(actionState.getSubtaskStates()[i]);
-            }
+            plan.getSubtaskActions().get(taskLocation)
+                .forEach(tuple -> {
+                    ActionState actionState = latestCompletedCheckpoint.getTaskStates().get(tuple.f0());
+                    if (actionState == null) {
+                        return;
+                    }
+                    if (COORDINATOR_INDEX.equals(tuple.f1())) {
+                        states.add(actionState.getCoordinatorState());
+                        return;
+                    }
+                    for (int i = tuple.f1(); i < actionState.getParallelism(); i += currentParallelism) {
+                        states.add(actionState.getSubtaskStates()[i]);
+                    }
+                });
         }
         checkpointManager.sendOperationToMemberNode(new NotifyTaskRestoreOperation(taskLocation, states));
     }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
index 0dd1c0dc5..2859c96c3 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
 
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 
+import com.hazelcast.jet.datamodel.Tuple2;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -37,6 +38,8 @@ import java.util.Set;
 @AllArgsConstructor(access = AccessLevel.PRIVATE)
 public class CheckpointPlan {
 
+    public static final Integer COORDINATOR_INDEX = -1;
+
     private final int pipelineId;
 
     /**
@@ -56,11 +59,19 @@ public class CheckpointPlan {
      */
     private final Map<Long, Integer> pipelineActions;
 
+    /**
+     * <br> key: the subtask locations;
+     * <br> value: all actions in this subtask; f0: action id, f1: action index;
+     */
+    private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+
     public static final class Builder {
         private final Set<TaskLocation> pipelineSubtasks = new HashSet<>();
         private final Set<TaskLocation> startingSubtasks = new HashSet<>();
         private final Map<Long, Integer> pipelineActions = new HashMap<>();
 
+        private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions = new HashMap<>();
+
         private Builder() {
         }
 
@@ -78,5 +89,10 @@ public class CheckpointPlan {
             this.pipelineActions.putAll(pipelineActions);
             return this;
         }
+
+        public Builder subtaskActions(Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions) {
+            this.subtaskActions.putAll(subtaskActions);
+            return this;
+        }
     }
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 90bf6336f..1e96c4ba5 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -111,6 +111,12 @@ public class PhysicalPlanGenerator {
      */
     private final Set<TaskLocation> startingTasks;
 
+    /**
+     * <br> key: the subtask locations;
+     * <br> value: all actions in this subtask; f0: action id, f1: action index;
+     */
+    private final Map<TaskLocation, Set<Tuple2<Long, Integer>>> subtaskActions;
+
     private final IMap<Object, Object> runningJobStateIMap;
 
     private final IMap<Object, Object> runningJobStateTimestampsIMap;
@@ -132,6 +138,7 @@ public class PhysicalPlanGenerator {
         // the checkpoint of a pipeline
         this.pipelineTasks = new HashSet<>();
         this.startingTasks = new HashSet<>();
+        this.subtaskActions = new HashMap<>();
         this.runningJobStateIMap = runningJobStateIMap;
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
     }
@@ -147,6 +154,7 @@ public class PhysicalPlanGenerator {
         Stream<SubPlan> subPlanStream = pipelines.stream().map(pipeline -> {
             this.pipelineTasks.clear();
             this.startingTasks.clear();
+            this.subtaskActions.clear();
             final int pipelineId = pipeline.getId();
             final List<ExecutionEdge> edges = pipeline.getEdges();
 
@@ -172,6 +180,7 @@ public class PhysicalPlanGenerator {
                     .pipelineSubtasks(pipelineTasks)
                     .startingSubtasks(startingTasks)
                     .pipelineActions(pipeline.getActions())
+                    .subtaskActions(subtaskActions)
                     .build());
             return new SubPlan(pipelineId,
                 totalPipelineNum,
@@ -207,10 +216,10 @@ public class PhysicalPlanGenerator {
             .collect(Collectors.toList());
 
         return collect.stream().map(s -> (SinkAction<?, ?, ?, ?>) s.getRightVertex().getAction())
-            .map(s -> {
+            .map(sinkAction -> {
                 Optional<? extends SinkAggregatedCommitter<?, ?>> sinkAggregatedCommitter;
                 try {
-                    sinkAggregatedCommitter = s.getSink().createAggregatedCommitter();
+                    sinkAggregatedCommitter = sinkAction.getSink().createAggregatedCommitter();
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -222,22 +231,23 @@ public class PhysicalPlanGenerator {
                         new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
                     TaskLocation taskLocation = new TaskLocation(taskGroupLocation, taskTypeId, 0);
                     SinkAggregatedCommitterTask<?, ?> t =
-                        new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, s,
+                        new SinkAggregatedCommitterTask(jobImmutableInformation.getJobId(), taskLocation, sinkAction,
                             sinkAggregatedCommitter.get());
-                    committerTaskIDMap.put(s, taskLocation);
+                    committerTaskIDMap.put(sinkAction, taskLocation);
 
                     // checkpoint
                     pipelineTasks.add(taskLocation);
+                    subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2(sinkAction.getId(), -1)));
 
                     return new PhysicalVertex(atomicInteger.incrementAndGet(),
                         executorService,
                         collect.size(),
-                        new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-AggregatedCommitterTask",
+                        new TaskGroupDefaultImpl(taskGroupLocation, sinkAction.getName() + "-AggregatedCommitterTask",
                             Lists.newArrayList(t)),
                         flakeIdGenerator,
                         pipelineIndex,
                         totalPipelineNum,
-                        s.getJarUrls(),
+                        sinkAction.getJarUrls(),
                         jobImmutableInformation,
                         initializationTimestamp,
                         nodeEngine,
@@ -268,7 +278,7 @@ public class PhysicalPlanGenerator {
                     SeaTunnelTask seaTunnelTask =
                         new TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i, flow);
                     // checkpoint
-                    pipelineTasks.add(taskLocation);
+                    fillCheckpointPlan(seaTunnelTask);
                     t.add(new PhysicalVertex(
                         i,
                         executorService,
@@ -295,24 +305,25 @@ public class PhysicalPlanGenerator {
                                                    int totalPipelineNum) {
         AtomicInteger atomicInteger = new AtomicInteger(-1);
 
-        return sources.stream().map(s -> {
+        return sources.stream().map(sourceAction -> {
             long taskGroupID = idGenerator.getNextId();
             long taskTypeId = idGenerator.getNextId();
             TaskGroupLocation taskGroupLocation =
                 new TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex, taskGroupID);
             TaskLocation taskLocation = new TaskLocation(taskGroupLocation, taskTypeId, 0);
             SourceSplitEnumeratorTask<?> t =
-                new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation, s);
+                new SourceSplitEnumeratorTask<>(jobImmutableInformation.getJobId(), taskLocation, sourceAction);
             // checkpoint
             pipelineTasks.add(taskLocation);
             startingTasks.add(taskLocation);
-            enumeratorTaskIDMap.put(s, taskLocation);
+            subtaskActions.put(taskLocation, Collections.singleton(Tuple2.tuple2(sourceAction.getId(), -1)));
+            enumeratorTaskIDMap.put(sourceAction, taskLocation);
 
             return new PhysicalVertex(
                 atomicInteger.incrementAndGet(),
                 executorService,
                 sources.size(),
-                new TaskGroupDefaultImpl(taskGroupLocation, s.getName() + "-SplitEnumerator",
+                new TaskGroupDefaultImpl(taskGroupLocation, sourceAction.getName() + "-SplitEnumerator",
                     Lists.newArrayList(t)),
                 flakeIdGenerator,
                 pipelineIndex,
@@ -352,8 +363,6 @@ public class PhysicalPlanGenerator {
                                 flowTaskIDPrefixMap.computeIfAbsent(f.getFlowID(), id -> idGenerator.getNextId());
                             final TaskLocation taskLocation =
                                 new TaskLocation(taskGroupLocation, taskIDPrefix, finalParallelismIndex);
-                            // checkpoint
-                            pipelineTasks.add(taskLocation);
                             if (f instanceof PhysicalExecutionFlow) {
                                 return new SourceSeaTunnelTask<>(jobImmutableInformation.getJobId(),
                                     taskLocation,
@@ -363,7 +372,7 @@ public class PhysicalPlanGenerator {
                                     taskLocation,
                                     finalParallelismIndex, f);
                             }
-                        }).collect(Collectors.toList());
+                        }).peek(this::fillCheckpointPlan).collect(Collectors.toList());
                     Set<URL> jars =
                         taskList.stream().flatMap(task -> task.getJarsUrl().stream()).collect(Collectors.toSet());
 
@@ -408,6 +417,14 @@ public class PhysicalPlanGenerator {
             }).collect(Collectors.toList());
     }
 
+    private void fillCheckpointPlan(SeaTunnelTask task) {
+        pipelineTasks.add(task.getTaskLocation());
+        subtaskActions.put(task.getTaskLocation(),
+            task.getActionIds().stream()
+                .map(id -> Tuple2.tuple2(id, task.getTaskLocation().getTaskIndex()))
+                .collect(Collectors.toSet()));
+    }
+
     /**
      * set config for flow, some flow should have config support for execute on task.
      *
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index 5379e1d60..437ecd74b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -31,6 +31,7 @@ import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.engine.common.utils.ConsumerWithException;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.PartitionTransformAction;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
@@ -75,6 +76,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 public abstract class SeaTunnelTask extends AbstractTask {
@@ -231,21 +233,29 @@ public abstract class SeaTunnelTask extends AbstractTask {
 
     @Override
     public Set<URL> getJarsUrl() {
+        return getFlowInfo((action, set) -> set.addAll(action.getJarUrls()));
+    }
+
+    public Set<Long> getActionIds() {
+        return getFlowInfo((action, set) -> set.add(action.getId()));
+    }
+
+    private <T> Set<T> getFlowInfo(BiConsumer<Action, Set<T>> function) {
         List<Flow> now = new ArrayList<>();
         now.add(executionFlow);
-        Set<URL> urls = new HashSet<>();
+        Set<T> result = new HashSet<>();
         while (!now.isEmpty()) {
             final List<Flow> next = new ArrayList<>();
             now.forEach(n -> {
                 if (n instanceof PhysicalExecutionFlow) {
-                    urls.addAll(((PhysicalExecutionFlow) n).getAction().getJarUrls());
+                    function.accept(((PhysicalExecutionFlow) n).getAction(), result);
                 }
                 next.addAll(n.getNext());
             });
             now.clear();
             now.addAll(next);
         }
-        return urls;
+        return result;
     }
 
     @Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 30df27151..4da0c2896 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -34,6 +34,7 @@ import com.hazelcast.nio.ObjectDataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class RestoredSplitOperation extends TaskOperation {
 
@@ -78,7 +79,10 @@ public class RestoredSplitOperation extends TaskOperation {
         SeaTunnelServer server = getService();
         TaskExecutionService taskExecutionService = server.getTaskExecutionService();
         ClassLoader classLoader = taskExecutionService.getExecutionContext(taskLocation.getTaskGroupLocation()).getClassLoader();
-        List<SourceSplit> deserialize = Arrays.asList(SerializationUtils.deserialize(splits, classLoader));
+
+        List<SourceSplit> deserialize = Arrays.stream((Object[]) SerializationUtils.deserialize(splits, classLoader))
+            .map(o -> (SourceSplit) o)
+            .collect(Collectors.toList());
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<SourceSplit> task = taskExecutionService.getTask(taskLocation);
             task.addSplitsBack(deserialize, subtaskIndex);