You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:26 UTC

[airflow] 06/38: Add State types for tasks and DAGs (#15285)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dd946470719fc363159c92438ab0e5c93ebae39e
Author: Andrew Godwin <an...@astronomer.io>
AuthorDate: Tue Jul 6 17:36:01 2021 -0600

    Add State types for tasks and DAGs (#15285)
    
    This adds TaskState and DagState enum types that contain all possible states, makes all other core state constants derive their values from them, and adds a couple of initial type hints that use the new enums (with the plan being that we can add signficantly more later).
    
    closes: #9387
    (cherry picked from commit 2b7c59619b7dd6fd5031745ade7756466456f803)
---
 airflow/models/dagrun.py |   6 +-
 airflow/typing_compat.py |   2 +-
 airflow/utils/state.py   | 169 ++++++++++++++++++++++++++++-------------------
 3 files changed, 107 insertions(+), 70 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 3bd86ee..f29d6ac 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -48,7 +48,7 @@ from airflow.utils import callback_requests, timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
 from airflow.utils.types import DagRunType
 
 if TYPE_CHECKING:
@@ -302,7 +302,9 @@ class DagRun(Base, LoggingMixin):
         return f"{run_type}__{execution_date.isoformat()}"
 
     @provide_session
-    def get_task_instances(self, state=None, session=None) -> Iterable[TI]:
+    def get_task_instances(
+        self, state: Optional[Iterable[TaskInstanceState]] = None, session=None
+    ) -> Iterable[TI]:
         """Returns the task instances for this dag run"""
         tis = session.query(TI).filter(
             TI.dag_id == self.dag_id,
diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py
index 0f3db75..26237c2 100644
--- a/airflow/typing_compat.py
+++ b/airflow/typing_compat.py
@@ -22,7 +22,7 @@ codebase easier.
 """
 
 try:
-    # Protocol and TypedDict are only added to typing module starting from
+    # Literal, Protocol and TypedDict are only added to typing module starting from
     # python 3.8 we can safely remove this shim import after Airflow drops
     # support for <3.8
     from typing import Literal, Protocol, TypedDict, runtime_checkable  # type: ignore
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index d5300e1..b1b27d0 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -16,70 +16,101 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from enum import Enum
+from typing import Dict, FrozenSet, Tuple
+
 from airflow.settings import STATE_COLORS
+from airflow.utils.types import Optional
 
 
-class State:
+class TaskInstanceState(str, Enum):
     """
-    Static class with task instance states constants and color method to
-    avoid hardcoding.
+    Enum that represents all possible states that a Task Instance can be in.
+
+    Note that None is also allowed, so always use this in a type hint with Optional.
     """
 
-    # scheduler
-    NONE = None  # type: None
-    REMOVED = "removed"
-    SCHEDULED = "scheduled"
+    # Set by the scheduler
+    # None - Task is created but should not run yet
+    REMOVED = "removed"  # Task vanished from DAG before it ran
+    SCHEDULED = "scheduled"  # Task should run and will be handed to executor soon
+
+    # Set by the task instance itself
+    QUEUED = "queued"  # Executor has enqueued the task
+    RUNNING = "running"  # Task is executing
+    SUCCESS = "success"  # Task completed
+    SHUTDOWN = "shutdown"  # External request to shut down
+    FAILED = "failed"  # Task errored out
+    UP_FOR_RETRY = "up_for_retry"  # Task failed but has retries left
+    UP_FOR_RESCHEDULE = "up_for_reschedule"  # A waiting `reschedule` sensor
+    UPSTREAM_FAILED = "upstream_failed"  # One or more upstream deps failed
+    SKIPPED = "skipped"  # Skipped by branching or some other mechanism
+    SENSING = "sensing"  # Smart sensor offloaded to the sensor DAG
 
-    # set by the executor (t.b.d.)
-    # LAUNCHED = "launched"
+    def __str__(self) -> str:  # pylint: disable=invalid-str-returned
+        return self.value
+
+
+class DagRunState(str, Enum):
+    """
+    Enum that represents all possible states that a DagRun can be in.
+
+    These are "shared" with TaskInstanceState in some parts of the code,
+    so please ensure that their values always match the ones with the
+    same name in TaskInstanceState.
+    """
 
-    # set by a task
-    QUEUED = "queued"
     RUNNING = "running"
     SUCCESS = "success"
-    SHUTDOWN = "shutdown"  # External request to shut down
     FAILED = "failed"
-    UP_FOR_RETRY = "up_for_retry"
-    UP_FOR_RESCHEDULE = "up_for_reschedule"
-    UPSTREAM_FAILED = "upstream_failed"
-    SKIPPED = "skipped"
-    SENSING = "sensing"
-
-    task_states = (
-        SUCCESS,
-        RUNNING,
-        FAILED,
-        UPSTREAM_FAILED,
-        SKIPPED,
-        UP_FOR_RETRY,
-        UP_FOR_RESCHEDULE,
-        QUEUED,
-        NONE,
-        SCHEDULED,
-        SENSING,
-        REMOVED,
-    )
 
-    dag_states = (
-        SUCCESS,
-        RUNNING,
-        FAILED,
+
+class State:
+    """
+    Static class with task instance state constants and color methods to
+    avoid hardcoding.
+    """
+
+    # Backwards-compat constants for code that does not yet use the enum
+    # These first three are shared by DagState and TaskState
+    SUCCESS = TaskInstanceState.SUCCESS
+    RUNNING = TaskInstanceState.RUNNING
+    FAILED = TaskInstanceState.FAILED
+
+    # These are TaskState only
+    NONE = None
+    REMOVED = TaskInstanceState.REMOVED
+    SCHEDULED = TaskInstanceState.SCHEDULED
+    QUEUED = TaskInstanceState.QUEUED
+    SHUTDOWN = TaskInstanceState.SHUTDOWN
+    UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
+    UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
+    UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
+    SKIPPED = TaskInstanceState.SKIPPED
+    SENSING = TaskInstanceState.SENSING
+
+    task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)
+
+    dag_states: Tuple[DagRunState, ...] = (
+        DagRunState.SUCCESS,
+        DagRunState.RUNNING,
+        DagRunState.FAILED,
     )
 
-    state_color = {
-        QUEUED: 'gray',
-        RUNNING: 'lime',
-        SUCCESS: 'green',
-        SHUTDOWN: 'blue',
-        FAILED: 'red',
-        UP_FOR_RETRY: 'gold',
-        UP_FOR_RESCHEDULE: 'turquoise',
-        UPSTREAM_FAILED: 'orange',
-        SKIPPED: 'pink',
-        REMOVED: 'lightgrey',
-        SCHEDULED: 'tan',
-        NONE: 'lightblue',
-        SENSING: 'lightseagreen',
+    state_color: Dict[Optional[TaskInstanceState], str] = {
+        None: 'lightblue',
+        TaskInstanceState.QUEUED: 'gray',
+        TaskInstanceState.RUNNING: 'lime',
+        TaskInstanceState.SUCCESS: 'green',
+        TaskInstanceState.SHUTDOWN: 'blue',
+        TaskInstanceState.FAILED: 'red',
+        TaskInstanceState.UP_FOR_RETRY: 'gold',
+        TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
+        TaskInstanceState.UPSTREAM_FAILED: 'orange',
+        TaskInstanceState.SKIPPED: 'pink',
+        TaskInstanceState.REMOVED: 'lightgrey',
+        TaskInstanceState.SCHEDULED: 'tan',
+        TaskInstanceState.SENSING: 'lightseagreen',
     }
     state_color.update(STATE_COLORS)  # type: ignore
 
@@ -96,17 +127,17 @@ class State:
             return 'white'
         return 'black'
 
-    running = frozenset([RUNNING, SENSING])
+    running: FrozenSet[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.SENSING])
     """
     A list of states indicating that a task is being executed.
     """
 
-    finished = frozenset(
+    finished: FrozenSet[TaskInstanceState] = frozenset(
         [
-            SUCCESS,
-            FAILED,
-            SKIPPED,
-            UPSTREAM_FAILED,
+            TaskInstanceState.SUCCESS,
+            TaskInstanceState.FAILED,
+            TaskInstanceState.SKIPPED,
+            TaskInstanceState.UPSTREAM_FAILED,
         ]
     )
     """
@@ -118,16 +149,16 @@ class State:
     case, it is no longer running.
     """
 
-    unfinished = frozenset(
+    unfinished: FrozenSet[Optional[TaskInstanceState]] = frozenset(
         [
-            NONE,
-            SCHEDULED,
-            QUEUED,
-            RUNNING,
-            SENSING,
-            SHUTDOWN,
-            UP_FOR_RETRY,
-            UP_FOR_RESCHEDULE,
+            None,
+            TaskInstanceState.SCHEDULED,
+            TaskInstanceState.QUEUED,
+            TaskInstanceState.RUNNING,
+            TaskInstanceState.SENSING,
+            TaskInstanceState.SHUTDOWN,
+            TaskInstanceState.UP_FOR_RETRY,
+            TaskInstanceState.UP_FOR_RESCHEDULE,
         ]
     )
     """
@@ -135,12 +166,16 @@ class State:
     a run or has not even started.
     """
 
-    failed_states = frozenset([FAILED, UPSTREAM_FAILED])
+    failed_states: FrozenSet[TaskInstanceState] = frozenset(
+        [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
+    )
     """
     A list of states indicating that a task or dag is a failed state.
     """
 
-    success_states = frozenset([SUCCESS, SKIPPED])
+    success_states: FrozenSet[TaskInstanceState] = frozenset(
+        [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
+    )
     """
     A list of states indicating that a task or dag is a success state.
     """