You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:26 UTC
[airflow] 06/38: Add State types for tasks and DAGs (#15285)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit dd946470719fc363159c92438ab0e5c93ebae39e
Author: Andrew Godwin <an...@astronomer.io>
AuthorDate: Tue Jul 6 17:36:01 2021 -0600
Add State types for tasks and DAGs (#15285)
This adds TaskState and DagState enum types that contain all possible states, makes all other core state constants derive their values from them, and adds a couple of initial type hints that use the new enums (with the plan being that we can add signficantly more later).
closes: #9387
(cherry picked from commit 2b7c59619b7dd6fd5031745ade7756466456f803)
---
airflow/models/dagrun.py | 6 +-
airflow/typing_compat.py | 2 +-
airflow/utils/state.py | 169 ++++++++++++++++++++++++++++-------------------
3 files changed, 107 insertions(+), 70 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 3bd86ee..f29d6ac 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -48,7 +48,7 @@ from airflow.utils import callback_requests, timezone
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, skip_locked, with_row_locks
-from airflow.utils.state import State
+from airflow.utils.state import State, TaskInstanceState
from airflow.utils.types import DagRunType
if TYPE_CHECKING:
@@ -302,7 +302,9 @@ class DagRun(Base, LoggingMixin):
return f"{run_type}__{execution_date.isoformat()}"
@provide_session
- def get_task_instances(self, state=None, session=None) -> Iterable[TI]:
+ def get_task_instances(
+ self, state: Optional[Iterable[TaskInstanceState]] = None, session=None
+ ) -> Iterable[TI]:
"""Returns the task instances for this dag run"""
tis = session.query(TI).filter(
TI.dag_id == self.dag_id,
diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py
index 0f3db75..26237c2 100644
--- a/airflow/typing_compat.py
+++ b/airflow/typing_compat.py
@@ -22,7 +22,7 @@ codebase easier.
"""
try:
- # Protocol and TypedDict are only added to typing module starting from
+ # Literal, Protocol and TypedDict are only added to typing module starting from
# python 3.8 we can safely remove this shim import after Airflow drops
# support for <3.8
from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index d5300e1..b1b27d0 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -16,70 +16,101 @@
# specific language governing permissions and limitations
# under the License.
+from enum import Enum
+from typing import Dict, FrozenSet, Tuple
+
from airflow.settings import STATE_COLORS
+from airflow.utils.types import Optional
-class State:
+class TaskInstanceState(str, Enum):
"""
- Static class with task instance states constants and color method to
- avoid hardcoding.
+ Enum that represents all possible states that a Task Instance can be in.
+
+ Note that None is also allowed, so always use this in a type hint with Optional.
"""
- # scheduler
- NONE = None # type: None
- REMOVED = "removed"
- SCHEDULED = "scheduled"
+ # Set by the scheduler
+ # None - Task is created but should not run yet
+ REMOVED = "removed" # Task vanished from DAG before it ran
+ SCHEDULED = "scheduled" # Task should run and will be handed to executor soon
+
+ # Set by the task instance itself
+ QUEUED = "queued" # Executor has enqueued the task
+ RUNNING = "running" # Task is executing
+ SUCCESS = "success" # Task completed
+ SHUTDOWN = "shutdown" # External request to shut down
+ FAILED = "failed" # Task errored out
+ UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
+ UP_FOR_RESCHEDULE = "up_for_reschedule" # A waiting `reschedule` sensor
+ UPSTREAM_FAILED = "upstream_failed" # One or more upstream deps failed
+ SKIPPED = "skipped" # Skipped by branching or some other mechanism
+ SENSING = "sensing" # Smart sensor offloaded to the sensor DAG
- # set by the executor (t.b.d.)
- # LAUNCHED = "launched"
+ def __str__(self) -> str: # pylint: disable=invalid-str-returned
+ return self.value
+
+
+class DagRunState(str, Enum):
+ """
+ Enum that represents all possible states that a DagRun can be in.
+
+ These are "shared" with TaskInstanceState in some parts of the code,
+ so please ensure that their values always match the ones with the
+ same name in TaskInstanceState.
+ """
- # set by a task
- QUEUED = "queued"
RUNNING = "running"
SUCCESS = "success"
- SHUTDOWN = "shutdown" # External request to shut down
FAILED = "failed"
- UP_FOR_RETRY = "up_for_retry"
- UP_FOR_RESCHEDULE = "up_for_reschedule"
- UPSTREAM_FAILED = "upstream_failed"
- SKIPPED = "skipped"
- SENSING = "sensing"
-
- task_states = (
- SUCCESS,
- RUNNING,
- FAILED,
- UPSTREAM_FAILED,
- SKIPPED,
- UP_FOR_RETRY,
- UP_FOR_RESCHEDULE,
- QUEUED,
- NONE,
- SCHEDULED,
- SENSING,
- REMOVED,
- )
- dag_states = (
- SUCCESS,
- RUNNING,
- FAILED,
+
+class State:
+ """
+ Static class with task instance state constants and color methods to
+ avoid hardcoding.
+ """
+
+ # Backwards-compat constants for code that does not yet use the enum
+ # These first three are shared by DagState and TaskState
+ SUCCESS = TaskInstanceState.SUCCESS
+ RUNNING = TaskInstanceState.RUNNING
+ FAILED = TaskInstanceState.FAILED
+
+ # These are TaskState only
+ NONE = None
+ REMOVED = TaskInstanceState.REMOVED
+ SCHEDULED = TaskInstanceState.SCHEDULED
+ QUEUED = TaskInstanceState.QUEUED
+ SHUTDOWN = TaskInstanceState.SHUTDOWN
+ UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
+ UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
+ UPSTREAM_FAILED = TaskInstanceState.UPSTREAM_FAILED
+ SKIPPED = TaskInstanceState.SKIPPED
+ SENSING = TaskInstanceState.SENSING
+
+ task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)
+
+ dag_states: Tuple[DagRunState, ...] = (
+ DagRunState.SUCCESS,
+ DagRunState.RUNNING,
+ DagRunState.FAILED,
)
- state_color = {
- QUEUED: 'gray',
- RUNNING: 'lime',
- SUCCESS: 'green',
- SHUTDOWN: 'blue',
- FAILED: 'red',
- UP_FOR_RETRY: 'gold',
- UP_FOR_RESCHEDULE: 'turquoise',
- UPSTREAM_FAILED: 'orange',
- SKIPPED: 'pink',
- REMOVED: 'lightgrey',
- SCHEDULED: 'tan',
- NONE: 'lightblue',
- SENSING: 'lightseagreen',
+ state_color: Dict[Optional[TaskInstanceState], str] = {
+ None: 'lightblue',
+ TaskInstanceState.QUEUED: 'gray',
+ TaskInstanceState.RUNNING: 'lime',
+ TaskInstanceState.SUCCESS: 'green',
+ TaskInstanceState.SHUTDOWN: 'blue',
+ TaskInstanceState.FAILED: 'red',
+ TaskInstanceState.UP_FOR_RETRY: 'gold',
+ TaskInstanceState.UP_FOR_RESCHEDULE: 'turquoise',
+ TaskInstanceState.UPSTREAM_FAILED: 'orange',
+ TaskInstanceState.SKIPPED: 'pink',
+ TaskInstanceState.REMOVED: 'lightgrey',
+ TaskInstanceState.SCHEDULED: 'tan',
+ TaskInstanceState.SENSING: 'lightseagreen',
}
state_color.update(STATE_COLORS) # type: ignore
@@ -96,17 +127,17 @@ class State:
return 'white'
return 'black'
- running = frozenset([RUNNING, SENSING])
+ running: FrozenSet[TaskInstanceState] = frozenset([TaskInstanceState.RUNNING, TaskInstanceState.SENSING])
"""
A list of states indicating that a task is being executed.
"""
- finished = frozenset(
+ finished: FrozenSet[TaskInstanceState] = frozenset(
[
- SUCCESS,
- FAILED,
- SKIPPED,
- UPSTREAM_FAILED,
+ TaskInstanceState.SUCCESS,
+ TaskInstanceState.FAILED,
+ TaskInstanceState.SKIPPED,
+ TaskInstanceState.UPSTREAM_FAILED,
]
)
"""
@@ -118,16 +149,16 @@ class State:
case, it is no longer running.
"""
- unfinished = frozenset(
+ unfinished: FrozenSet[Optional[TaskInstanceState]] = frozenset(
[
- NONE,
- SCHEDULED,
- QUEUED,
- RUNNING,
- SENSING,
- SHUTDOWN,
- UP_FOR_RETRY,
- UP_FOR_RESCHEDULE,
+ None,
+ TaskInstanceState.SCHEDULED,
+ TaskInstanceState.QUEUED,
+ TaskInstanceState.RUNNING,
+ TaskInstanceState.SENSING,
+ TaskInstanceState.SHUTDOWN,
+ TaskInstanceState.UP_FOR_RETRY,
+ TaskInstanceState.UP_FOR_RESCHEDULE,
]
)
"""
@@ -135,12 +166,16 @@ class State:
a run or has not even started.
"""
- failed_states = frozenset([FAILED, UPSTREAM_FAILED])
+ failed_states: FrozenSet[TaskInstanceState] = frozenset(
+ [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
+ )
"""
A list of states indicating that a task or dag is a failed state.
"""
- success_states = frozenset([SUCCESS, SKIPPED])
+ success_states: FrozenSet[TaskInstanceState] = frozenset(
+ [TaskInstanceState.SUCCESS, TaskInstanceState.SKIPPED]
+ )
"""
A list of states indicating that a task or dag is a success state.
"""