You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/23 12:28:08 UTC
[incubator-seatunnel] branch st-engine updated: [engine][checkpoint] checkpoint base classes (#2478)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new a01e591a2 [engine][checkpoint] checkpoint base classes (#2478)
a01e591a2 is described below
commit a01e591a2ad54bee2c9fee1842cf3d3c32d27b01
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Tue Aug 23 20:28:03 2022 +0800
[engine][checkpoint] checkpoint base classes (#2478)
* [engine] checkpoint base classes
* [engine][checkpoint] add checkpoint operation
* Serialization error
* [engine] fixed checkpoint operation id
* checkstyle
---
.../seatunnel/engine/server/SeaTunnelServer.java | 8 ++
.../server/checkpoint/CheckpointCoordinator.java | 52 ++++++++
.../CheckpointCoordinatorConfiguration.java | 143 +++++++++++++++++++++
.../server/checkpoint/CheckpointManager.java | 51 ++++++++
.../engine/server/checkpoint/CheckpointPlan.java | 25 ++++
.../server/checkpoint/CompletedCheckpoint.java | 72 +++++++++++
.../server/checkpoint/PendingCheckpoint.java | 72 +++++++++++
.../server/checkpoint/SubtaskStatistics.java | 53 ++++++++
.../engine/server/checkpoint/TaskStatistics.java | 81 ++++++++++++
.../server/dag/physical/PhysicalPlanGenerator.java | 7 +-
.../{PhysicalPlanUtils.java => PlanUtils.java} | 16 ++-
.../engine/server/execution/TaskInfo.java | 56 ++++++++
.../seatunnel/engine/server/master/JobMaster.java | 14 +-
...rOperation.java => CheckpointAckOperation.java} | 31 +++--
...ation.java => CheckpointFinishedOperation.java} | 31 +++--
.../operation/CheckpointTriggerOperation.java | 2 +-
...rOperation.java => TaskCompletedOperation.java} | 26 ++--
.../serializable/OperationDataSerializerHook.java | 19 ++-
.../operation/source/SourceRegisterOperation.java | 2 +-
.../seatunnel/engine/server/dag/TaskTest.java | 6 +-
20 files changed, 719 insertions(+), 48 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 3b0cf8dd6..23f7613e2 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -58,6 +58,10 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
private final SeaTunnelConfig seaTunnelConfig;
+ /**
+ * key: job id;
+ * <br> value: job master;
+ */
private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
public SeaTunnelServer(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
@@ -73,6 +77,10 @@ public class SeaTunnelServer implements ManagedService, MembershipAwareService,
return this.taskExecutionService;
}
+ public JobMaster getJobMaster(Long jobId) {
+ return runningJobMasterMap.get(jobId);
+ }
+
@Override
public void init(NodeEngine engine, Properties hzProperties) {
this.nodeEngine = (NodeEngineImpl) engine;
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
new file mode 100644
index 000000000..6046380f4
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Used to coordinate all checkpoints of a pipeline.
+ * <p>
+ * Generate and coordinate {@link Checkpoint} with a checkpoint plan
+ * </p>
+ */
+public class CheckpointCoordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+
+ private final CheckpointPlan plan;
+
+ private final Map<Long, PendingCheckpoint> pendingCheckpoints;
+
+ private final Map<Long, CompletedCheckpoint> completedCheckpoints;
+
+ private final CheckpointCoordinatorConfiguration config;
+
+ public CheckpointCoordinator(CheckpointPlan plan,
+ CheckpointCoordinatorConfiguration config) {
+ this.plan = plan;
+ this.config = config;
+ this.pendingCheckpoints = new LinkedHashMap<>();
+ this.completedCheckpoints = new LinkedHashMap<>();
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
new file mode 100644
index 000000000..bfb936831
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorConfiguration.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class CheckpointCoordinatorConfiguration implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final long MINIMAL_CHECKPOINT_TIME = 10;
+
+ private final long checkpointInterval;
+
+ private final long checkpointTimeout;
+
+ private final int maxConcurrentCheckpoints;
+
+ private final int tolerableFailureCheckpoints;
+
+ private CheckpointCoordinatorConfiguration(long checkpointInterval,
+ long checkpointTimeout,
+ int maxConcurrentCheckpoints,
+ int tolerableFailureCheckpoints) {
+ this.checkpointInterval = checkpointInterval;
+ this.checkpointTimeout = checkpointTimeout;
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+ this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
+ }
+
+ public long getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public long getCheckpointTimeout() {
+ return checkpointTimeout;
+ }
+
+ public int getMaxConcurrentCheckpoints() {
+ return maxConcurrentCheckpoints;
+ }
+
+ public int getTolerableFailureCheckpoints() {
+ return tolerableFailureCheckpoints;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CheckpointCoordinatorConfiguration that = (CheckpointCoordinatorConfiguration) o;
+ return checkpointInterval == that.checkpointInterval
+ && checkpointTimeout == that.checkpointTimeout
+ && maxConcurrentCheckpoints == that.maxConcurrentCheckpoints
+ && tolerableFailureCheckpoints == that.tolerableFailureCheckpoints;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ checkpointInterval,
+ checkpointTimeout,
+ maxConcurrentCheckpoints,
+ tolerableFailureCheckpoints);
+ }
+
+ @Override
+ public String toString() {
+ return "CheckpointCoordinatorConfiguration{" +
+ "checkpointInterval=" + checkpointInterval +
+ ", checkpointTimeout=" + checkpointTimeout +
+ ", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints +
+ ", tolerableFailureCheckpoints=" + tolerableFailureCheckpoints +
+ '}';
+ }
+
+ public static CheckpointCoordinatorConfiguration.Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private long checkpointInterval = MINIMAL_CHECKPOINT_TIME;
+ private long checkpointTimeout = MINIMAL_CHECKPOINT_TIME;
+ private int maxConcurrentCheckpoints = 1;
+ private int tolerableFailureCheckpoints = 0;
+
+ private Builder() {
+ }
+
+ public Builder checkpointInterval(long checkpointInterval) {
+ checkArgument(checkpointInterval < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint interval is 10 mills.");
+ this.checkpointInterval = checkpointInterval;
+ return this;
+ }
+
+ public Builder checkpointTimeout(long checkpointTimeout) {
+ checkArgument(checkpointTimeout < MINIMAL_CHECKPOINT_TIME, "The minimum checkpoint timeout is 10 mills.");
+ this.checkpointTimeout = checkpointTimeout;
+ return this;
+ }
+
+ public Builder maxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
+ checkArgument(maxConcurrentCheckpoints < 1, "The minimum number of concurrent checkpoints is 1.");
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+ return this;
+ }
+
+ public Builder tolerableFailureCheckpoints(int tolerableFailureCheckpoints) {
+ checkArgument(maxConcurrentCheckpoints < 0, "The number of tolerance failed checkpoints must be a natural number.");
+ this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
+ return this;
+ }
+
+ public CheckpointCoordinatorConfiguration build() {
+ return new CheckpointCoordinatorConfiguration(
+ checkpointInterval,
+ checkpointTimeout,
+ maxConcurrentCheckpoints,
+ tolerableFailureCheckpoints);
+ }
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
new file mode 100644
index 000000000..16da70a8b
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Used to manage all checkpoints for a job.
+ * <p>
+ * Maintain the life cycle of the {@link CheckpointCoordinator} through the {@link CheckpointPlan} and the status of the job.
+ * </p>
+ */
+public class CheckpointManager {
+
+ /**
+ * key: the pipeline id of the job;
+ * <br> value: the checkpoint plan of the pipeline;
+ */
+ private final Map<Long, CheckpointPlan> checkpointPlanMap;
+
+ /**
+ * key: the pipeline id of the job;
+ * <br> value: the checkpoint coordinator of the pipeline;
+ */
+ private final Map<Long, CheckpointCoordinator> coordinatorMap;
+
+ public CheckpointManager(Map<Long, CheckpointPlan> checkpointPlanMap) {
+ this.checkpointPlanMap = checkpointPlanMap;
+ this.coordinatorMap = new HashMap<>(checkpointPlanMap.size());
+ }
+
+ public CheckpointCoordinator getCheckpointCoordinator(long pipelineId) {
+ return coordinatorMap.get(pipelineId);
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
new file mode 100644
index 000000000..6a144668d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+/**
+ * plan info
+ */
+public class CheckpointPlan {
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
new file mode 100644
index 000000000..3b56f5129
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CompletedCheckpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class CompletedCheckpoint implements Checkpoint, Serializable {
+ private static final long serialVersionUID = 1L;
+ private final long jobId;
+
+ private final long pipelineId;
+
+ private final long checkpointId;
+
+ private final long triggerTimestamp;
+
+ private final long completedTimestamp;
+
+ private final Map<Long, TaskState> taskStates;
+
+ public CompletedCheckpoint(long jobId,
+ long pipelineId,
+ long checkpointId,
+ long triggerTimestamp,
+ long completedTimestamp,
+ Map<Long, TaskState> taskStates) {
+ this.jobId = jobId;
+ this.pipelineId = pipelineId;
+ this.checkpointId = checkpointId;
+ this.triggerTimestamp = triggerTimestamp;
+ this.completedTimestamp = completedTimestamp;
+ this.taskStates = taskStates;
+ }
+
+ @Override
+ public long getCheckpointId() {
+ return this.checkpointId;
+ }
+
+ @Override
+ public long getPipelineId() {
+ return this.pipelineId;
+ }
+
+ @Override
+ public long getJobId() {
+ return this.jobId;
+ }
+
+ @Override
+ public long getCheckpointTimestamp() {
+ return this.triggerTimestamp;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
new file mode 100644
index 000000000..1c999d107
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/PendingCheckpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.core.checkpoint.Checkpoint;
+
+import java.util.Map;
+import java.util.Set;
+
+public class PendingCheckpoint implements Checkpoint {
+
+ private final long jobId;
+
+ private final long pipelineId;
+
+ private final long checkpointId;
+
+ private final long triggerTimestamp;
+
+ private final Set<Long> notYetAcknowledgedTasks;
+
+ private final Map<Long, TaskState> taskStates;
+
+ public PendingCheckpoint(long jobId,
+ long pipelineId,
+ long checkpointId,
+ long triggerTimestamp,
+ Set<Long> notYetAcknowledgedTasks,
+ Map<Long, TaskState> taskStates) {
+ this.jobId = jobId;
+ this.pipelineId = pipelineId;
+ this.checkpointId = checkpointId;
+ this.triggerTimestamp = triggerTimestamp;
+ this.notYetAcknowledgedTasks = notYetAcknowledgedTasks;
+ this.taskStates = taskStates;
+ }
+
+ @Override
+ public long getCheckpointId() {
+ return this.checkpointId;
+ }
+
+ @Override
+ public long getPipelineId() {
+ return this.pipelineId;
+ }
+
+ @Override
+ public long getJobId() {
+ return this.jobId;
+ }
+
+ @Override
+ public long getCheckpointTimestamp() {
+ return this.triggerTimestamp;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
new file mode 100644
index 000000000..d098c1783
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/SubtaskStatistics.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import java.io.Serializable;
+
+public class SubtaskStatistics implements Serializable {
+
+ private static final long serialVersionUID = 8928594531621862214L;
+
+ private final int subtaskIndex;
+
+ /** Timestamp when the ack from this subtask was received at the coordinator. */
+ private final long ackTimestamp;
+
+ /** Size of the checkpointed state at this subtask. */
+ private final long stateSize;
+
+ public SubtaskStatistics(int subtaskIndex,
+ long ackTimestamp,
+ long stateSize) {
+ this.subtaskIndex = subtaskIndex;
+ this.ackTimestamp = ackTimestamp;
+ this.stateSize = stateSize;
+ }
+
+ public int getSubtaskIndex() {
+ return subtaskIndex;
+ }
+
+ public long getAckTimestamp() {
+ return ackTimestamp;
+ }
+
+ public long getStateSize() {
+ return stateSize;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
new file mode 100644
index 000000000..4956c9d1d
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskStatistics.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class TaskStatistics {
+ /** ID of the task the statistics belong to. */
+ private final Long jobVertexId;
+
+ private final SubtaskStatistics[] subtaskStats;
+
+ private int numAcknowledgedSubtasks;
+
+ private SubtaskStatistics latestAckedSubtaskStatistics;
+
+ TaskStatistics(Long jobVertexId, int parallelism) {
+ this.jobVertexId = checkNotNull(jobVertexId, "JobVertexID");
+ checkArgument(parallelism > 0, "the parallelism of task <= 0");
+ this.subtaskStats = new SubtaskStatistics[parallelism];
+ }
+
+ boolean reportSubtaskStats(SubtaskStatistics subtask) {
+ checkNotNull(subtask, "Subtask stats");
+ int subtaskIndex = subtask.getSubtaskIndex();
+
+ if (subtaskIndex < 0 || subtaskIndex >= subtaskStats.length) {
+ return false;
+ }
+
+ if (subtaskStats[subtaskIndex] == null) {
+ subtaskStats[subtaskIndex] = subtask;
+ numAcknowledgedSubtasks++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * @return The latest acknowledged subtask stats or <code>null</code> if none was acknowledged
+ * yet.
+ */
+ public SubtaskStatistics getLatestAcknowledgedSubtaskStatistics() {
+ return latestAckedSubtaskStatistics;
+ }
+
+ /**
+ * @return Ack timestamp of the latest acknowledged subtask or <code>-1</code> if none was
+ * acknowledged yet..
+ */
+ public long getLatestAckTimestamp() {
+ return latestAckedSubtaskStatistics != null ?
+ latestAckedSubtaskStatistics.getAckTimestamp() :
+ -1;
+ }
+
+ public Long getJobVertexId() {
+ return jobVertexId;
+ }
+
+ public SubtaskStatistics[] getSubtaskStats() {
+ return subtaskStats;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index a2b71303d..0f5dcdb52 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineState;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionEdge;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
@@ -52,6 +53,7 @@ import org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQu
import com.google.common.collect.Lists;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
@@ -111,7 +113,7 @@ public class PhysicalPlanGenerator {
this.flakeIdGenerator = flakeIdGenerator;
}
- public PhysicalPlan generate() {
+ public Tuple2<PhysicalPlan, CheckpointPlan> generate() {
// TODO Determine which tasks do not need to be restored according to state
CopyOnWriteArrayList<PassiveCompletableFuture<PipelineState>> waitForCompleteBySubPlanList =
@@ -151,11 +153,12 @@ public class PhysicalPlanGenerator {
jobImmutableInformation);
});
- return new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
+ PhysicalPlan physicalPlan = new PhysicalPlan(subPlanStream.collect(Collectors.toList()),
executorService,
jobImmutableInformation,
initializationTimestamp,
waitForCompleteBySubPlanList.toArray(new PassiveCompletableFuture[waitForCompleteBySubPlanList.size()]));
+ return Tuple2.tuple2(physicalPlan, null);
}
private List<SourceAction<?, ?, ?>> findSourceAction(List<ExecutionEdge> edges) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
similarity index 66%
rename from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
rename to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index 1acc21671..fcc24c6f9 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanUtils.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -19,22 +19,24 @@ package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
+import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.NonNull;
import java.util.concurrent.ExecutorService;
-public class PhysicalPlanUtils {
+public class PlanUtils {
- public static PhysicalPlan fromLogicalDAG(@NonNull LogicalDag logicalDag,
- @NonNull NodeEngine nodeEngine,
- @NonNull JobImmutableInformation jobImmutableInformation,
- long initializationTimestamp,
- @NonNull ExecutorService executorService,
- @NonNull FlakeIdGenerator flakeIdGenerator) {
+ public static Tuple2<PhysicalPlan, CheckpointPlan> fromLogicalDAG(@NonNull LogicalDag logicalDag,
+ @NonNull NodeEngine nodeEngine,
+ @NonNull JobImmutableInformation jobImmutableInformation,
+ long initializationTimestamp,
+ @NonNull ExecutorService executorService,
+ @NonNull FlakeIdGenerator flakeIdGenerator) {
return new PhysicalPlanGenerator(
new ExecutionPlanGenerator(logicalDag, jobImmutableInformation, initializationTimestamp).generate(),
nodeEngine,
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskInfo.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskInfo.java
new file mode 100644
index 000000000..fdec3ad8f
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.execution;
+
+import java.io.Serializable;
+
+public class TaskInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Long jobId;
+
+ private final Long pipelineId;
+
+ private final Long jobVertexId;
+
+ private final Integer index;
+
+ public TaskInfo(Long jobId, Long pipelineId, Long jobVertexId, Integer index) {
+ this.jobId = jobId;
+ this.pipelineId = pipelineId;
+ this.jobVertexId = jobVertexId;
+ this.index = index;
+ }
+
+ public Long getJobId() {
+ return jobId;
+ }
+
+ public Long getPipelineId() {
+ return pipelineId;
+ }
+
+ public Long getJobVertexId() {
+ return jobVertexId;
+ }
+
+ public Integer getIndex() {
+ return index;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 1cdc0b5f7..3af7ebe12 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -23,8 +23,10 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointManager;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.SimpleResourceManager;
import org.apache.seatunnel.engine.server.scheduler.JobScheduler;
@@ -32,6 +34,7 @@ import org.apache.seatunnel.engine.server.scheduler.PipelineBaseScheduler;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngine;
@@ -55,6 +58,8 @@ public class JobMaster implements Runnable {
private ResourceManager resourceManager;
+ private CheckpointManager checkpointManager;
+
private CompletableFuture<JobStatus> jobMasterCompleteFuture = new CompletableFuture<>();
private JobImmutableInformation jobImmutableInformation;
@@ -80,12 +85,13 @@ public class JobMaster implements Runnable {
// TODO Use classloader load the connector jars and deserialize logicalDag
this.logicalDag = nodeEngine.getSerializationService().toObject(jobImmutableInformation.getLogicalDag());
- physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag,
+ final Tuple2<PhysicalPlan, CheckpointPlan> planTuple = PlanUtils.fromLogicalDAG(logicalDag,
nodeEngine,
jobImmutableInformation,
System.currentTimeMillis(),
executorService,
flakeIdGenerator);
+ physicalPlan = planTuple.f0();
}
@SuppressWarnings("checkstyle:MagicNumber")
@@ -126,6 +132,10 @@ public class JobMaster implements Runnable {
return resourceManager;
}
+ public CheckpointManager getCheckpointManager() {
+ return checkpointManager;
+ }
+
public PassiveCompletableFuture<JobStatus> getJobMasterCompleteFuture() {
return new PassiveCompletableFuture<>(jobMasterCompleteFuture);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointAckOperation.java
similarity index 60%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointAckOperation.java
index 6de6b3f75..6947b2fad 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointAckOperation.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator;
+import org.apache.seatunnel.engine.server.execution.TaskInfo;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
@@ -26,34 +28,43 @@ import com.hazelcast.nio.ObjectDataOutput;
import java.io.IOException;
-public class CheckpointTriggerOperation extends AsyncOperation {
- private CheckpointBarrier checkpointBarrier;
+public class CheckpointAckOperation extends AsyncOperation {
+ private TaskInfo taskInfo;
- public CheckpointTriggerOperation() {
+ private byte[] states;
+
+ public CheckpointAckOperation() {
}
- public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
- this.checkpointBarrier = checkpointBarrier;
+ public CheckpointAckOperation(TaskInfo taskInfo, byte[] states) {
+ this.taskInfo = taskInfo;
+ this.states = states;
}
@Override
public int getClassId() {
- return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ return OperationDataSerializerHook.CHECKPOINT_ACK_OPERATOR;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- out.writeObject(checkpointBarrier);
+ out.writeObject(taskInfo);
+ out.writeByteArray(states);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- checkpointBarrier = in.readObject(CheckpointBarrier.class);
+ taskInfo = in.readObject(TaskInfo.class);
+ states = in.readByteArray();
}
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
- // TODO: All source Vertexes executed
+ CheckpointCoordinator checkpointCoordinator = ((SeaTunnelServer) getService())
+ .getJobMaster(taskInfo.getJobId())
+ .getCheckpointManager()
+ .getCheckpointCoordinator(taskInfo.getPipelineId());
+ // TODO: notify coordinator
return null;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointFinishedOperation.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointFinishedOperation.java
index 6de6b3f75..2d061482e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointFinishedOperation.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
@@ -26,34 +25,46 @@ import com.hazelcast.nio.ObjectDataOutput;
import java.io.IOException;
-public class CheckpointTriggerOperation extends AsyncOperation {
- private CheckpointBarrier checkpointBarrier;
+public class CheckpointFinishedOperation extends AsyncOperation {
+ private long jobId;
- public CheckpointTriggerOperation() {
+ private long pipelineId;
+
+ private long checkpointId;
+
+ public CheckpointFinishedOperation() {
}
- public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
- this.checkpointBarrier = checkpointBarrier;
+ public CheckpointFinishedOperation(long jobId,
+ long pipelineId,
+ long checkpointId) {
+ this.jobId = jobId;
+ this.pipelineId = pipelineId;
+ this.checkpointId = checkpointId;
}
@Override
public int getClassId() {
- return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ return OperationDataSerializerHook.CHECKPOINT_FINISHED_OPERATOR;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- out.writeObject(checkpointBarrier);
+ out.writeLong(jobId);
+ out.writeLong(pipelineId);
+ out.writeLong(checkpointId);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- checkpointBarrier = in.readObject(CheckpointBarrier.class);
+ jobId = in.readLong();
+ pipelineId = in.readLong();
+ checkpointId = in.readLong();
}
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
- // TODO: All source Vertexes executed
+ // TODO: Notifies all tasks of the pipeline about the status of the checkpoint
return null;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
index 6de6b3f75..235cd010e 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
@@ -38,7 +38,7 @@ public class CheckpointTriggerOperation extends AsyncOperation {
@Override
public int getClassId() {
- return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ return OperationDataSerializerHook.CHECKPOINT_TRIGGER_OPERATOR;
}
@Override
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/TaskCompletedOperation.java
similarity index 64%
copy from seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
copy to seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/TaskCompletedOperation.java
index 6de6b3f75..fbca114c1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/TaskCompletedOperation.java
@@ -18,7 +18,9 @@
package org.apache.seatunnel.engine.server.operation;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator;
+import org.apache.seatunnel.engine.server.execution.TaskInfo;
import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
@@ -26,34 +28,38 @@ import com.hazelcast.nio.ObjectDataOutput;
import java.io.IOException;
-public class CheckpointTriggerOperation extends AsyncOperation {
- private CheckpointBarrier checkpointBarrier;
+public class TaskCompletedOperation extends AsyncOperation {
+ private TaskInfo taskInfo;
- public CheckpointTriggerOperation() {
+ public TaskCompletedOperation() {
}
- public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
- this.checkpointBarrier = checkpointBarrier;
+ public TaskCompletedOperation(TaskInfo taskInfo) {
+ this.taskInfo = taskInfo;
}
@Override
public int getClassId() {
- return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ return OperationDataSerializerHook.TASK_COMPLETED_OPERATOR;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- out.writeObject(checkpointBarrier);
+ out.writeObject(taskInfo);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- checkpointBarrier = in.readObject(CheckpointBarrier.class);
+ taskInfo = in.readObject(TaskInfo.class);
}
@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
- // TODO: All source Vertexes executed
+ CheckpointCoordinator checkpointCoordinator = ((SeaTunnelServer) getService())
+ .getJobMaster(taskInfo.getJobId())
+ .getCheckpointManager()
+ .getCheckpointCoordinator(taskInfo.getPipelineId());
+ // TODO: notify coordinator
return null;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index d5ade4d59..2a40f871b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,10 +18,13 @@
package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.CheckpointAckOperation;
+import org.apache.seatunnel.engine.server.operation.CheckpointFinishedOperation;
import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
+import org.apache.seatunnel.engine.server.operation.TaskCompletedOperation;
import org.apache.seatunnel.engine.server.operation.WaitForJobCompleteOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
@@ -40,9 +43,15 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int PRINT_MESSAGE_OPERATOR = 0;
public static final int SUBMIT_OPERATOR = 1;
public static final int DEPLOY_TASK_OPERATOR = 2;
- public static final int WAIT_FORM_JOB_COMPLETE_OPERATOR = 3;
- public static final int CHECKPOINT_TRIGGER_OPERATOR = 4;
+ public static final int TASK_COMPLETED_OPERATOR = 3;
+ public static final int WAIT_FORM_JOB_COMPLETE_OPERATOR = 4;
+
+ public static final int CHECKPOINT_TRIGGER_OPERATOR = 5;
+
+ public static final int CHECKPOINT_ACK_OPERATOR = 6;
+
+ public static final int CHECKPOINT_FINISHED_OPERATOR = 7;
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -70,10 +79,16 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
return new SubmitJobOperation();
case DEPLOY_TASK_OPERATOR:
return new DeployTaskOperation();
+ case TASK_COMPLETED_OPERATOR:
+ return new TaskCompletedOperation();
case WAIT_FORM_JOB_COMPLETE_OPERATOR:
return new WaitForJobCompleteOperation();
case CHECKPOINT_TRIGGER_OPERATOR:
return new CheckpointTriggerOperation();
+ case CHECKPOINT_ACK_OPERATOR:
+ return new CheckpointAckOperation();
+ case CHECKPOINT_FINISHED_OPERATOR:
+ return new CheckpointFinishedOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index e570aeb0c..bca5615c8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -61,7 +61,7 @@ public class SourceRegisterOperation extends Operation implements IdentifiedData
task.receivedReader(readerTaskID, readerAddress);
return null;
}, new RetryUtils.RetryMaterial(RETRY_TIME, true,
- exception -> exception instanceof NullPointerException, RETRY_TIME_OUT));
+ exception -> exception instanceof NullPointerException, RETRY_TIME_OUT));
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 5c78af388..b5f43ad3b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -36,7 +36,7 @@ import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
-import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlanUtils;
+import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
@@ -145,11 +145,11 @@ public class TaskTest {
JobImmutableInformation jobImmutableInformation = new JobImmutableInformation(1,
nodeEngine.getSerializationService().toData(logicalDag), config, Collections.emptyList());
- PhysicalPlan physicalPlan = PhysicalPlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
+ PhysicalPlan physicalPlan = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine,
jobImmutableInformation,
System.currentTimeMillis(),
Executors.newCachedThreadPool(),
- instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME));
+ instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME)).f0();
Assert.assertEquals(physicalPlan.getPipelineList().size(), 1);
Assert.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(), 1);