You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 15:39:02 UTC

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-26029][Runtime/Checkpointing] Generalize the checkpoint protocol of OperatorCoordinator

lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r923461372


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox components like RPC
+ * components) via {@link #setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple coordinator
+     * checkpoints would be attempted overlapping, which is currently not supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {

Review Comment:
   According to the Java doc of this class, `This class is NOT thread safe, but assumed to be used in a single threaded context`. Should we also invoke `checkRunsInMainThread()` in methods such as `tryCloseGateway()` and `sendEvent()`?
   
   I assume we want to make sure tha `isClosed` and `blockedEvents` won't be accessed concurrently.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -236,7 +240,20 @@ public void notifyCheckpointComplete(long checkpointId) {
         // checkpoint coordinator time thread.
         // we can remove the delegation once the checkpoint coordinator runs fully in the
         // scheduler's main thread executor
-        mainThreadExecutor.execute(() -> coordinator.notifyCheckpointComplete(checkpointId));
+        mainThreadExecutor.execute(
+                () -> {
+                    coordinator.notifyCheckpointComplete(checkpointId);
+                    for (Map.Entry<Integer, CloseableSubtaskGateway> entry :
+                            subtaskGatewayMap.entrySet()) {
+                        if (entry.getValue().isClosed()) {
+                            LOG.warn(

Review Comment:
   Is there any case that this would happen? If no, would it be simpler to throw exception?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox components like RPC
+ * components) via {@link #setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {

Review Comment:
   Typically `CloseableXXX` refers to classes that implement the `java.io.Closeable` interface. Could we rename this class to reduce confusion?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -277,29 +300,33 @@ private void checkpointCoordinatorInternal(
             final long checkpointId, final CompletableFuture<byte[]> result) {
         mainThreadExecutor.assertRunningInMainThread();
 
-        final CompletableFuture<byte[]> coordinatorCheckpoint = new CompletableFuture<>();
-
-        FutureUtils.assertNoException(
-                coordinatorCheckpoint.handleAsync(
-                        (success, failure) -> {
-                            if (failure != null) {
-                                result.completeExceptionally(failure);
-                            } else if (eventValve.tryShutValve(checkpointId)) {
-                                completeCheckpointOnceEventsAreDone(checkpointId, result, success);
-                            } else {
-                                // if we cannot shut the valve, this means the checkpoint
-                                // has been aborted before, so the future is already
-                                // completed exceptionally. but we try to complete it here
-                                // again, just in case, as a safety net.
-                                result.completeExceptionally(
-                                        new FlinkException("Cannot shut event valve"));
-                            }
-                            return null;
-                        },
-                        mainThreadExecutor));
-
         try {
-            eventValve.markForCheckpoint(checkpointId);
+            subtaskGatewayMap.values().forEach(x -> x.markForCheckpoint(checkpointId));
+
+            final CompletableFuture<byte[]> coordinatorCheckpoint = new CompletableFuture<>();
+
+            coordinatorCheckpoint.whenComplete(
+                    (success, failure) -> {
+                        if (failure != null) {
+                            result.completeExceptionally(failure);
+                        } else {
+                            closeGateways(checkpointId, result);

Review Comment:
   `coordinatorCheckpoint.whenComplete(...)` does not guarantee the provided `action` is invoked by the current thread. In order to make sure `closeGateways()` can still be executed using `mainThreadExecutor`, it seems necessary to use `handleAsync(...)` as did before this PR.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox components like RPC
+ * components) via {@link #setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple coordinator
+     * checkpoints would be attempted overlapping, which is currently not supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
+
+        if (expectedCheckpointId != currentCheckpointId) {
+            return;
+        }
+        openGatewayAndUnmarkCheckpoint();
+    }
+
+    /** Opens the gateway, releasing all buffered events. */
+    void openGatewayAndUnmarkCheckpoint() {

Review Comment:
   Would it be more readable to mark this method as either public or private? Same for other methods.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox components like RPC
+ * components) via {@link #setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple coordinator
+     * checkpoints would be attempted overlapping, which is currently not supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
+
+        if (expectedCheckpointId != currentCheckpointId) {
+            return;
+        }
+        openGatewayAndUnmarkCheckpoint();
+    }
+
+    /** Opens the gateway, releasing all buffered events. */
+    void openGatewayAndUnmarkCheckpoint() {
+        checkRunsInMainThread();
+
+        currentCheckpointId = NO_CHECKPOINT;
+        if (!isClosed) {
+            return;
+        }
+
+        for (BlockedEvent blockedEvent : blockedEvents) {
+            CompletableFuture<Acknowledge> result =
+                    innerGateway.sendEvent(blockedEvent.operatorEvent);
+            FutureUtils.forward(result, blockedEvent.future);
+        }
+        blockedEvents.clear();
+
+        isClosed = false;
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) {
+        if (isClosed) {
+            CompletableFuture<Acknowledge> sendResult = new CompletableFuture<>();

Review Comment:
   Suppose the given operator event can not be serialized and the gateway is closed, the exception will be added to the future returned to the caller. 
   
   This behavior is inconsistent with the case when the gateway is open. Also note that many callers currently do not check the future returned by this method. This means that an exception that can be caught previously might be missed now.
   
   Would it be safer to keep the previous behavior by always throwing the deserialization exception directly to the caller?
   
   Same for the exception that is thrown when the gateway is not read.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/CloseableSubtaskGateway.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An {@link org.apache.flink.runtime.operators.coordination.OperatorCoordinator.SubtaskGateway}
+ * that can be temporarily closed, blocking events from going through, buffering them, and releasing
+ * them later. It is used for "alignment" of operator event streams with checkpoint barrier
+ * injection, similar to how the input channels are aligned during a common checkpoint.
+ *
+ * <p>This class is NOT thread safe, but assumed to be used in a single threaded context. To guard
+ * that, one can register a "main thread executor" (as used by the mailbox components like RPC
+ * components) via {@link #setMainThreadExecutorForValidation(ComponentMainThreadExecutor)}.
+ */
+class CloseableSubtaskGateway implements OperatorCoordinator.SubtaskGateway {
+
+    /** The wrapped gateway that actually performs the event-sending operation. */
+    private final OperatorCoordinator.SubtaskGateway innerGateway;
+
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
+    @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
+
+    CloseableSubtaskGateway(OperatorCoordinator.SubtaskGateway innerGateway) {
+        this.innerGateway = innerGateway;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    void setMainThreadExecutorForValidation(ComponentMainThreadExecutor mainThreadExecutor) {
+        this.mainThreadExecutor = mainThreadExecutor;
+    }
+
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple coordinator
+     * checkpoints would be attempted overlapping, which is currently not supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {

Review Comment:
   Do we expect the communication from coordinator to operator to always support `openGatewayAndUnmarkCheckpoint`? 
   
   If yes, would it be more reasonable o put such methods in the `SubtaskGateway` and merge `CloseableSubtaskGateway` into `SubtaskGatewayImpl`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org