You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/08/18 07:03:13 UTC

[incubator-seatunnel] branch st-engine updated: [engine][checkpoint] checkpoint base interface (#2448)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new 5b09e9c1b [engine][checkpoint] checkpoint base interface (#2448)
5b09e9c1b is described below

commit 5b09e9c1bde47a8a514f3634121b3a2df6a9b74e
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Thu Aug 18 15:03:08 2022 +0800

    [engine][checkpoint] checkpoint base interface (#2448)
    
    * [engine][checkpoint] checkpoint base interface
    
    # Conflicts:
    #       seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
    
    * [engine] fix checkpoint trigger operation
---
 .../engine/core/checkpoint/Checkpoint.java         | 30 +++++++++
 .../engine/core/checkpoint/CheckpointBarrier.java  | 78 ++++++++++++++++++++++
 .../core/checkpoint/CheckpointIDCounter.java       | 61 +++++++++++++++++
 .../engine/core/checkpoint/CheckpointListener.java | 40 +++++++++++
 .../engine/core/checkpoint/CheckpointType.java     | 39 +++++++++++
 .../engine/server/checkpoint/TaskState.java        | 54 +++++++++++++++
 .../operation/CheckpointTriggerOperation.java      | 59 ++++++++++++++++
 .../serializable/OperationDataSerializerHook.java  |  5 ++
 8 files changed, 366 insertions(+)

diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/Checkpoint.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/Checkpoint.java
new file mode 100644
index 000000000..6f6e5d4e9
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/Checkpoint.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+/** A checkpoint, pending or completed. */
+public interface Checkpoint {
+
+    long getCheckpointId();
+
+    long getPipelineId();
+
+    long getJobId();
+
+    long getCheckpointTimestamp();
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointBarrier.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointBarrier.java
new file mode 100644
index 000000000..4e6cf5600
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointBarrier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+
+public class CheckpointBarrier implements Serializable {
+    private final long id;
+    private final long timestamp;
+    private final CheckpointType checkpointType;
+
+    public CheckpointBarrier(long id, long timestamp, CheckpointType checkpointType) {
+        this.id = id;
+        this.timestamp = timestamp;
+        this.checkpointType = checkNotNull(checkpointType);
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public CheckpointType getCheckpointType() {
+        return checkpointType;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(id, timestamp, checkpointType);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) {
+            return true;
+        } else if (other == null || other.getClass() != CheckpointBarrier.class) {
+            return false;
+        } else {
+            CheckpointBarrier that = (CheckpointBarrier) other;
+            return that.id == this.id
+                    && that.timestamp == this.timestamp
+                    && this.checkpointType.equals(that.checkpointType);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointType);
+    }
+
+    public boolean isAuto() {
+        return checkpointType.isAuto();
+    }
+
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
new file mode 100644
index 000000000..5fdc88ead
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointIDCounter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+import java.util.concurrent.CompletableFuture;
+
+/** A checkpoint ID counter. */
+public interface CheckpointIDCounter {
+
+    int INITIAL_CHECKPOINT_ID = 1;
+
+    /** Starts the {@link CheckpointIDCounter} service down. */
+    void start() throws Exception;
+
+    /**
+     * Shuts the {@link CheckpointIDCounter} service.
+     *
+     * <p>The job status is forwarded and used to decide whether state should actually be discarded
+     * or kept.
+     *
+     * @return The {@code CompletableFuture} holding the result of the shutdown operation.
+     */
+    CompletableFuture<Void> shutdown();
+
+    /**
+     * Atomically increments the current checkpoint ID.
+     *
+     * @return The previous checkpoint ID
+     */
+    long getAndIncrement() throws Exception;
+
+    /**
+     * Atomically gets the current checkpoint ID.
+     *
+     * @return The current checkpoint ID
+     */
+    long get();
+
+    /**
+     * Sets the current checkpoint ID.
+     *
+     * @param newId The new ID
+     */
+    void setCount(long newId) throws Exception;
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointListener.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointListener.java
new file mode 100644
index 000000000..e0fe3980c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+public interface CheckpointListener {
+
+    /**
+     * Notifies the listener that the checkpoint with the given {@code checkpointId} completed and
+     * was committed.
+     *
+     * @param checkpointId The ID of the checkpoint that has been completed.
+     * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+     *     the task. Note that this will NOT lead to the checkpoint being revoked.
+     */
+    void notifyCheckpointComplete(long checkpointId) throws Exception;
+
+    /**
+     * This method is called as a notification once a distributed checkpoint has been aborted.
+     *
+     * @param checkpointId The ID of the checkpoint that has been aborted.
+     * @throws Exception This method can propagate exceptions, which leads to a failure/recovery for
+     *     the task or job.
+     */
+    default void notifyCheckpointAborted(long checkpointId) throws Exception {}
+}
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
new file mode 100644
index 000000000..485f711d7
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.checkpoint;
+
+public enum CheckpointType {
+    CHECKPOINT_TYPE(true, "checkpoint"),
+    SAVEPOINT_TYPE(false, "savepoint");
+
+    private final boolean auto;
+    private final String name;
+
+    CheckpointType(boolean auto, String name) {
+        this.auto = auto;
+        this.name = name;
+    }
+
+    public boolean isAuto() {
+        return auto;
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskState.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskState.java
new file mode 100644
index 000000000..e3a1e1b22
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/TaskState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TaskState implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The id of the task. */
+    private final String taskId;
+
+    /** The handles to states created by the parallel tasks: subtaskIndex -> subtask state. */
+    private final Map<Integer, byte[]> subtaskStates;
+
+    /** The parallelism of the operator when it was checkpointed. */
+    private final int parallelism;
+
+    public TaskState(String taskId, int parallelism) {
+        this.taskId = taskId;
+        this.subtaskStates = new HashMap<>(parallelism);
+        this.parallelism = parallelism;
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public Map<Integer, byte[]> getSubtaskStates() {
+        return subtaskStates;
+    }
+
+    public int getParallelism() {
+        return parallelism;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
new file mode 100644
index 000000000..d66a5d66c
--- /dev/null
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/CheckpointTriggerOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.core.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.serializable.OperationDataSerializerHook;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+
+import java.io.IOException;
+
+public class CheckpointTriggerOperation extends AsyncOperation {
+    private CheckpointBarrier checkpointBarrier;
+
+    public CheckpointTriggerOperation() {
+    }
+
+    public CheckpointTriggerOperation(CheckpointBarrier checkpointBarrier) {
+        this.checkpointBarrier = checkpointBarrier;
+    }
+
+    @Override
+    public int getClassId() {
+        return OperationDataSerializerHook.SUBMIT_OPERATOR;
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        out.writeObject(checkpointBarrier);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        checkpointBarrier = in.readObject(CheckpointBarrier.class);
+    }
+
+    @Override
+    protected NonCompletableFuture<?> doRun() throws Exception {
+        // TODO: All source Vertexes executed
+        return null;
+    }
+}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
index 07db59e4e..575b4bfa1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/OperationDataSerializerHook.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.serializable;
 
 import org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
+import org.apache.seatunnel.engine.server.operation.CheckpointTriggerOperation;
 import org.apache.seatunnel.engine.server.operation.DeployTaskOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
@@ -39,6 +40,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
     public static final int SUBMIT_OPERATOR = 1;
     public static final int DEPLOY_TASK_OPERATOR = 2;
 
+    public static final int CHECKPOINT_TRIGGER_OPERATOR = 3;
+
     public static final int FACTORY_ID = FactoryIdHelper.getFactoryId(
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
         SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY_ID
@@ -65,6 +68,8 @@ public final class OperationDataSerializerHook implements DataSerializerHook {
                     return new SubmitJobOperation();
                 case DEPLOY_TASK_OPERATOR:
                     return new DeployTaskOperation();
+                case CHECKPOINT_TRIGGER_OPERATOR:
+                    return new CheckpointTriggerOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + typeId);
             }