You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/22 08:07:10 UTC

[GitHub] [flink] lindong28 commented on a diff in pull request #20275: [FLINK-28606][Runtime/Checkpointing] Preserve consistency of OperatorEvent from OperatorCoordinator to subtasks

lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r927315690


##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -284,22 +311,24 @@ private void checkpointCoordinatorInternal(
                         (success, failure) -> {
                             if (failure != null) {
                                 result.completeExceptionally(failure);
-                            } else if (eventValve.tryShutValve(checkpointId)) {
+                            } else if (closeGateways(checkpointId, subtasksToCheckpoint)) {
                                 completeCheckpointOnceEventsAreDone(checkpointId, result, success);
                             } else {
-                                // if we cannot shut the valve, this means the checkpoint
+                                // if we cannot close the gateway, this means the checkpoint
                                 // has been aborted before, so the future is already
                                 // completed exceptionally. but we try to complete it here
                                 // again, just in case, as a safety net.
                                 result.completeExceptionally(
-                                        new FlinkException("Cannot shut event valve"));
+                                        new FlinkException("Cannot close gateway"));
                             }
                             return null;
                         },
                         mainThreadExecutor));
 
         try {
-            eventValve.markForCheckpoint(checkpointId);
+            for (int subtask : subtasksToCheckpoint) {
+                subtaskGatewayMap.get(subtask).markForCheckpoint(checkpointId);

Review Comment:
   Would it be simpler to call `markForCheckpoint()` for every subtask instead of additionally passing `subtasksToCheckpoint` as a function input parameter?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -120,7 +118,7 @@
     private final OperatorID operatorId;
     private final LazyInitializedCoordinatorContext context;
     private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
-    private final OperatorEventValve eventValve;
+    private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap = new HashMap<>();

Review Comment:
   Should we instantiate this variable in the constructor for consistency with `unconfirmedEvents`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java:
##########
@@ -59,15 +66,25 @@ public static CompletableFuture<CoordinatorSnapshot> triggerCoordinatorCheckpoin
 
     public static CompletableFuture<AllCoordinatorSnapshots> triggerAllCoordinatorCheckpoints(
             final Collection<OperatorCoordinatorCheckpointContext> coordinators,
-            final long checkpointId)
+            final PendingCheckpoint checkpoint)
             throws Exception {
 
         final Collection<CompletableFuture<CoordinatorSnapshot>> individualSnapshots =
                 new ArrayList<>(coordinators.size());
 
         for (final OperatorCoordinatorCheckpointContext coordinator : coordinators) {
+            Set<Integer> subtasksToCheckpoint = new HashSet<>();

Review Comment:
   Is there any conceptual difference between `tasksToWaitFor` and `subtasksToCheckpoint`? If no, it is probably more readable to use a name that is consistent with `CheckpointPlan::getTasksToWaitFor()`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
 import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface that access to
  * subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from going through,
+ * buffering them, and releasing them later. It is used for "alignment" of operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid concurrent issues.

Review Comment:
   `concurrent issues` -> `race condition`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal(
         }
     }
 
+    private boolean closeGateways(
+            final long checkpointId, final Set<Integer> subtasksToCheckpoint) {
+        boolean hasCloseableGateway = false;
+        for (int subtask : subtasksToCheckpoint) {
+            SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask);
+            if (!gateway.tryCloseGateway(checkpointId)) {

Review Comment:
   Would it be simpler to call `tryCloseGateway()` for every subtask instead of only for `subtasksToCheckpoint`?
   
   The logic here seems to suggest that it is possible to have `tryCloseGateway(..)` fail for all tasks consistently. But it is not possible to have it fail for a subset of tasks. Can you explain this possibility and the impossibility?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java:
##########
@@ -29,6 +29,9 @@ public interface OperatorEventDispatcher {
     /**
      * Register a listener that is notified every time an OperatorEvent is sent from the
      * OperatorCoordinator (of the operator with the given OperatorID) to this subtask.
+     *
+     * <p>The stream operator with the given OperatorID must implement {@link OperatorEventHandler}

Review Comment:
   The existing comment (prior to this PR) seems to have described the behavior of calling this method. Could you explain what is the extra benefit of adding this comment?
   
   And why does the operator with the given operatorID have to implement `OperatorEventHandler`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
 import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface that access to
  * subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from going through,
+ * buffering them, and releasing them later. It is used for "alignment" of operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid concurrent issues.
  */
 class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
 
     private static final String EVENT_LOSS_ERROR_MESSAGE =
             "An OperatorEvent from an OperatorCoordinator to a task was lost. "
                     + "Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";
 
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
     private final SubtaskAccess subtaskAccess;
-    private final EventSender sender;
-    private final Executor sendingExecutor;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
     private final IncompleteFuturesTracker incompleteFuturesTracker;
 
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();

Review Comment:
   Should we initialize this variable in the constructor for consistency with e.g. `lastCheckpointId`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
 import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface that access to
  * subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from going through,
+ * buffering them, and releasing them later. It is used for "alignment" of operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid concurrent issues.
  */
 class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
 
     private static final String EVENT_LOSS_ERROR_MESSAGE =
             "An OperatorEvent from an OperatorCoordinator to a task was lost. "
                     + "Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";
 
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
     private final SubtaskAccess subtaskAccess;
-    private final EventSender sender;
-    private final Executor sendingExecutor;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
     private final IncompleteFuturesTracker incompleteFuturesTracker;
 
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;

Review Comment:
   Should we explicitly initialize this variable?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -91,27 +130,133 @@ public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) {
                                                         new FlinkException(msg, failure)));
                             }
                         },
-                        sendingExecutor);
+                        mainThreadExecutor);
 
-        sendingExecutor.execute(
+        mainThreadExecutor.execute(
                 () -> {
-                    sender.sendEvent(sendAction, sendResult);
+                    sendEventInternal(sendAction, sendResult);
                     incompleteFuturesTracker.trackFutureWhileIncomplete(result);
                 });
+
         return result;
     }
 
-    @Override
-    public ExecutionAttemptID getExecution() {
-        return subtaskAccess.currentAttempt();
+    private void sendEventInternal(
+            Callable<CompletableFuture<Acknowledge>> sendAction,
+            CompletableFuture<Acknowledge> result) {
+        checkRunsInMainThread();
+
+        if (isClosed) {
+            blockedEvents.add(new BlockedEvent(sendAction, result));
+        } else {
+            callSendAction(sendAction, result);
+        }
     }
 
-    @Override
-    public int getSubtask() {
-        return subtaskAccess.getSubtaskIndex();
+    private void callSendAction(
+            Callable<CompletableFuture<Acknowledge>> sendAction,
+            CompletableFuture<Acknowledge> result) {
+        try {
+            final CompletableFuture<Acknowledge> sendResult = sendAction.call();
+            FutureUtils.forward(sendResult, result);
+        } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalError(t);
+            result.completeExceptionally(t);
+        }
     }
 
-    private boolean isReady() {
-        return subtaskAccess.hasSwitchedToRunning().isDone();
+    /**
+     * Marks the gateway for the next checkpoint. This remembers the checkpoint ID and will only
+     * allow closing the gateway for this specific checkpoint.
+     *
+     * <p>This is the gateway's mechanism to detect situations where multiple coordinator
+     * checkpoints would be attempted overlapping, which is currently not supported (the gateway
+     * doesn't keep a list of events blocked per checkpoint). It also helps to identify situations
+     * where the checkpoint was aborted even before the gateway was closed (by finding out that the
+     * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+     */
+    void markForCheckpoint(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId != checkpointId) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot mark for checkpoint %d, already marked for checkpoint %d",
+                            checkpointId, currentCheckpointId));
+        }
+
+        if (checkpointId > lastCheckpointId) {
+            currentCheckpointId = checkpointId;
+            lastCheckpointId = checkpointId;
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d",
+                            lastCheckpointId, checkpointId));
+        }
+    }
+
+    /**
+     * Closes the gateway. All events sent through this gateway are blocked until the gateway is
+     * re-opened. If the gateway is already closed, this does nothing.
+     *
+     * @return True if the gateway is closed, false if the checkpointId is incorrect.
+     */
+    boolean tryCloseGateway(long checkpointId) {
+        checkRunsInMainThread();
+
+        if (checkpointId == currentCheckpointId) {
+            isClosed = true;
+            return true;
+        }
+        return false;
+    }
+
+    boolean isClosed() {
+        checkRunsInMainThread();
+        return isClosed;
+    }
+
+    void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {
+        checkRunsInMainThread();
+
+        if (expectedCheckpointId != currentCheckpointId) {

Review Comment:
   Prior to this PR, `OperatorEventVale::openValveAndUnmarkCheckpoint(long expectedCheckpointId)` throws IllegalStateException in this case.
   
   Could it be useful to still throw IllegalStateException if `currentCheckpointId != expectedCheckpointId && currentCheckpointId != NO_CHECKPOINT`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
 import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
 
 /**
  * Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface that access to
  * subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from going through,
+ * buffering them, and releasing them later. It is used for "alignment" of operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid concurrent issues.
  */
 class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
 
     private static final String EVENT_LOSS_ERROR_MESSAGE =
             "An OperatorEvent from an OperatorCoordinator to a task was lost. "
                     + "Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";
 
+    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
     private final SubtaskAccess subtaskAccess;
-    private final EventSender sender;
-    private final Executor sendingExecutor;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
     private final IncompleteFuturesTracker incompleteFuturesTracker;
 
+    private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+    private long currentCheckpointId;
+
+    private long lastCheckpointId;
+
+    private boolean isClosed;
+
     SubtaskGatewayImpl(
             SubtaskAccess subtaskAccess,
-            EventSender sender,
-            Executor sendingExecutor,
+            ComponentMainThreadExecutor mainThreadExecutor,
             IncompleteFuturesTracker incompleteFuturesTracker) {
         this.subtaskAccess = subtaskAccess;
-        this.sender = sender;
-        this.sendingExecutor = sendingExecutor;
+        this.mainThreadExecutor = mainThreadExecutor;
         this.incompleteFuturesTracker = incompleteFuturesTracker;
+        this.currentCheckpointId = NO_CHECKPOINT;
+        this.lastCheckpointId = Long.MIN_VALUE;
+    }
+
+    @Override
+    public ExecutionAttemptID getExecution() {

Review Comment:
   nits: it is in general preferred not to move code around unless there is clear benefits (e.g. a code style pattern that we can consistently enforce in the future).
   
   Do we need to move these 3 methods here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org