You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/18 07:03:13 UTC
[incubator-seatunnel] branch st-engine updated: [engine][checkpoint] checkpoint base interface (#2448)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 5b09e9c1b [engine][checkpoint] checkpoint base interface (#2448)
5b09e9c1b is described below
commit 5b09e9c1bde47a8a514f3634121b3a2df6a9b74e
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Aug 18 15:03:08 2022 +0800
[engine][checkpoint] checkpoint base interface (#2448)
* [engine][checkpoint] checkpoint base interface
# Conflicts:
# seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
* [engine] fix checkpoint trigger operation
---
.../engine/core/checkpoint/Checkpoint.java | 30 +++++++++
.../engine/core/checkpoint/CheckpointBarrier.java | 78 ++++++++++++++++++++++
.../core/checkpoint/CheckpointIDCounter.java | 61 +++++++++++++++++
.../engine/core/checkpoint/CheckpointListener.java | 40 +++++++++++
.../engine/core/checkpoint/CheckpointType.java | 39 +++++++++++
.../engine/server/checkpoint/TaskState.java | 54 +++++++++++++++
.../operation/CheckpointTriggerOperation.java | 59 ++++++++++++++++
.../serializable/OperationDataSerializerHook.java | 5 ++
8 files changed, 366 insertions(+)
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/Checkpoint.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/Checkpoint.java
new file mode 100644
index 000000000..6f6e5d4e9
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/Checkpoint.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+/** A checkpoint, pending or completed. */
+public interface Checkpoint {
+
+ long getCheckpointId();
+
+ long getPipelineId();
+
+ long getJobId();
+
+ long getCheckpointTimestamp();
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointBarrier.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointBarrier.java
new file mode 100644
index 000000000..4e6cf5600
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointBarrier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+
+public class CheckpointBarrier implements Serializable {
+ private final long id;
+ private final long timestamp;
+ private final CheckpointType checkpointType;
+
+ public CheckpointBarrier(long id, long timestamp, CheckpointType checkpointType) {
+ this.id = id;
+ this.timestamp = timestamp;
+ this.checkpointType = checkNotNull(checkpointType);
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public CheckpointType getCheckpointType() {
+ return checkpointType;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id, timestamp, checkpointType);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == this) {
+ return true;
+ } else if (other == null || other.getClass() != CheckpointBarrier.class) {
+ return false;
+ } else {
+ CheckpointBarrier that = (CheckpointBarrier) other;
+ return that.id == this.id
+ && that.timestamp == this.timestamp
+ && this.checkpointType.equals(that.checkpointType);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointType);
+ }
+
+ public boolean isAuto() {
+ return checkpointType.isAuto();
+ }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
new file mode 100644
index 000000000..5fdc88ead
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+import java.util.concurrent.CompletableFuture;
+
+/** A checkpoint ID counter. */
+public interface CheckpointIDCounter {
+
+ int INITIAL_CHECKPOINT_ID = 1;
+
+ /** Starts the {@link CheckpointIDCounter} service down. */
+ void start() throws Exception;
+
+ /**
+ * Shuts the {@link CheckpointIDCounter} service.
+ *
+ * <p>The job status is forwarded and used to decide whether state should actually be discarded
+ * or kept.
+ *
+ * @return The {@code CompletableFuture} holding the result of the shutdown operation.
+ */
+ CompletableFuture<Void> shutdown();
+
+ /**
+ * Atomically increments the current checkpoint ID.
+ *
+ * @return The previous checkpoint ID
+ */
+ long getAndIncrement() throws Exception;
+
+ /**
+ * Atomically gets the current checkpoint ID.
+ *
+ * @return The current checkpoint ID
+ */
+ long get();
+
+ /**
+ * Sets the current checkpoint ID.
+ *
+ * @param newId The new ID
+ */
+ void setCount(long newId) throws Exception;
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointListener.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointListener.java
new file mode 100644
index 000000000..e0fe3980c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+public interface CheckpointListener {
+
+ /**
+ * Notifies the listener that the checkpoint with the given {@code checkpointId} completed and
+ * was committed.
+ *
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+ * the task. Note that this will NOT lead to the checkpoint being revoked.
+ */
+ void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+ /**
+ * This method is called as a notification once a distributed checkpoint has been aborted.
+ *
+ * @param checkpointId The ID of the checkpoint that has been aborted.
+ * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+ * the task or job.
+ */
+ default void notifyCheckpointAborted(long checkpointId) throws Exception {}
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
new file mode 100644
index 000000000..485f711d7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+public enum CheckpointType {
+ CHECKPOINT_TYPE(true, "checkpoint"),
+ SAVEPOINT_TYPE(false, "savepoint");
+
+ private final boolean auto;
+ private final String name;
+
+ CheckpointType(boolean auto, String name) {
+ this.auto = auto;
+ this.name = name;
+ }
+
+ public boolean isAuto() {
+ return auto;
+ }
+
+ public String getName() {
+ return name;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskState.java
new file mode 100644
index 000000000..e3a1e1b22
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskState implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The id of the task. */
+ private final String taskId;
+
+ /** The handles to states created by the parallel tasks: subtaskIndex -> subtask state. */
+ private final Map<Integer, byte[]> subtaskStates;
+
+ /** The parallelism of the operator when it was checkpointed. */
+ private final int parallelism;
+
+ public TaskState(String taskId, int parallelism) {
+ this.taskId = taskId;
+ this.subtaskStates = new HashMap<>(parallelism);
+ this.parallelism = parallelism;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public Map<Integer, byte[]> getSubtaskStates() {
+ return subtaskStates;
+ }
+
+ public int getParallelism() {
+ return parallelism;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
new file mode 100644
index 000000000..d66a5d66c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+
+public class CheckpointTriggerOperation extends AsyncOperation {
+ private CheckpointBarrier checkpointBarrier;
+
+ public CheckpointTriggerOperation() {
+ }
+
+ public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
+ this.checkpointBarrier = checkpointBarrier;
+ }
+
+ @Override
+ public int getClassId() {
+ return OperationDataSerializerHook.SUBMIT_OPERATOR;
+ }
+
+ @Override
+ protected void writeInternal(ObjectDataOutput out) throws IOException {
+ out.writeObject(checkpointBarrier);
+ }
+
+ @Override
+ protected void readInternal(ObjectDataInput in) throws IOException {
+ checkpointBarrier = in.readObject(CheckpointBarrier.class);
+ }
+
+ @Override
+ protected NonCompletableFuture<?> doRun() throws Exception {
+ // TODO: All source Vertexes executed
+ return null;
+ }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 07db59e4e..575b4bfa1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.serializable;
import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
@@ -39,6 +40,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
public static final int SUBMIT_OPERATOR = 1;
public static final int DEPLOY_TASK_OPERATOR = 2;
+ public static final int CHECKPOINT_TRIGGER_OPERATOR = 3;
+
public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
@@ -65,6 +68,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
return new SubmitJobOperation();
case DEPLOY_TASK_OPERATOR:
return new DeployTaskOperation();
+ case CHECKPOINT_TRIGGER_OPERATOR:
+ return new CheckpointTriggerOperation();
default:
throw new IllegalArgumentException("Unknown type id " + typeId);
}