You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "rkhachatryan (via GitHub)" <gi...@apache.org> on 2023/04/11 21:31:12 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #21923: FLINK-13871: Consolidate volatile status fields in StreamTask

rkhachatryan commented on code in PR #21923:
URL: https://github.com/apache/flink/pull/21923#discussion_r1163352135


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -980,8 +963,7 @@ protected CompletableFuture<Void> getCompletionFuture() {
 
     @Override
     public final void cancel() throws Exception {
-        isRunning = false;
-        canceled = true;
+        taskState.canceled = true;

Review Comment:
   Shouldn't we also update `taskState.status` here?
   
   (ideally, in my view we should use either enum or a set of flags inside the `TaskState`, but not both)



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1797,4 +1779,74 @@ public interface CanEmitBatchOfRecordsChecker {
 
         boolean check();
     }
+
+    /** Possible states of a Task. */
+    private static class TaskState {
+        /**
+         * An enumeration of all states that a task can be in during its execution. Transitions
+         * usually follow the diagram bellow:
+         *
+         * <pre>{@code
+         * INITIALIZED  -> RESTORING -> RUNNING -> FINISHED
+         *                 |             |           |
+         *                 |             |           |
+         *                 |             V           |
+         *                 |           CANCELED -----+

Review Comment:
   `CANCELED` state was removed from the code.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1797,4 +1779,74 @@ public interface CanEmitBatchOfRecordsChecker {
 
         boolean check();
     }
+
+    /** Possible states of a Task. */
+    private static class TaskState {
+        /**
+         * An enumeration of all states that a task can be in during its execution. Transitions
+         * usually follow the diagram bellow:
+         *
+         * <pre>{@code
+         * INITIALIZED  -> RESTORING -> RUNNING -> FINISHED
+         *                 |             |           |
+         *                 |             |           |
+         *                 |             V           |
+         *                 |           CANCELED -----+
+         *                 |            |
+         *                 +------------+
+         *
+         * }</pre>
+         *
+         * <p>Task enters {@code RESTORING} status before {@code RUNNING} to restore an invokable
+         * object from the last valid state, if any.
+         *
+         * <p>Task enters {@code FINISHED} status through cleanup method, regardless of
+         * cancellations or if the previous call succeeded.
+         *
+         * <p>It is possible for a Task to be {@code failing} while being in any status e.g, failing
+         * == true while status is RUNNING
+         */
+        private enum Status {
+            /** Task has successfully terminated. */
+            FINISHED(),
+            /** The task is "in operation". */
+            RUNNING(FINISHED),
+            /** The task is restoring during {@link #restore()}. */
+            RESTORING(RUNNING, FINISHED),
+            /** Task constructor was called on init state. */
+            INITIALIZED(RESTORING);
+
+            Status[] transitions;
+
+            Status(Status... transitions) {
+                this.transitions = transitions;
+            }
+        }
+
+        /**
+         * Task is failing e.g., if an exception occurred inside {@link #invoke()}. Note that this
+         * can happen while Task is still in Running status.
+         */
+        private volatile boolean failing = false;
+
+        /**
+         * Task has been canceled. Note that can value can be true even while Task is still in
+         * Running status, e.g., canceled externally by TaskCanceler.
+         */
+        private volatile boolean canceled = false;
+
+        private volatile Status status = Status.INITIALIZED;
+
+        final boolean transitionTo(Status newStatus) {
+            return Arrays.stream(status.transitions).anyMatch(newStatus::equals);
+        }

Review Comment:
   According to this method name and usages, should it actually update the state? (rather than just returning boolean)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org