You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/03 09:38:13 UTC
[GitHub] [airflow] tothandor edited a comment on issue #12199: New dag run / task state: disposed
tothandor edited a comment on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-1003969950
I have already made a quick'n'dirty patch for that, that introduces an "acknowledge" state, to be able to track the errors already handled, and the ones left to be done. If someone is familiar with monitoring systems (e.g. CheckMK, Nagios, Zabbix, Zenoss), then this won't be new for him/her. Each error should either be dismissed or fixed within a time. In some sense Airflow is also a monitoring system.
The color I chose for acknowledged state is sienna, which is between green (good, ok), and red (bad, failure, error), but it's still looks rather dull red, according to my perception, to mark a healed problem, which still shows its presence. Later on this could help to identify a returning problems, which would requires more inspection.
```python
--- airflow/settings.py.orig 2021-12-07 15:49:11.960772472 +0100
+++ airflow/settings.py 2021-12-07 15:30:21.792037118 +0100
@@ -94,6 +94,7 @@
"skipped": "pink",
"scheduled": "tan",
"deferred": "mediumpurple",
+ "acknowledged": "sienna",
}
--- airflow/api/common/experimental/mark_tasks.py.orig 2021-11-17 16:31:35.044668390 +0100
+++ airflow/api/common/experimental/mark_tasks.py 2021-12-07 17:05:20.931619345 +0100
@@ -71,6 +71,7 @@
state: str = State.SUCCESS,
commit: bool = False,
session=None,
+ failed: bool = False,
):
"""
Set the state of a task instance and if needed its relatives. Can set state
@@ -89,6 +90,7 @@
:param state: State to which the tasks need to be set
:param commit: Commit tasks to be altered to the database
:param session: database session
+ :param failed: Affect only failed task instances
:return: list of tasks that have been created and updated
"""
if not tasks:
@@ -121,6 +123,8 @@
if sub_dag_run_ids:
qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
tis_altered += qry_sub_dag.with_for_update().all()
+ if failed:
+ tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
for task_instance in tis_altered:
task_instance.state = state
if state in State.finished:
@@ -131,6 +135,9 @@
if sub_dag_run_ids:
qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
tis_altered += qry_sub_dag.all()
+ if failed:
+ tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
+
return tis_altered
@@ -316,6 +323,35 @@
)
+@provide_session
+def set_dag_run_state_to_acknowledged(dag, execution_date, commit=False, session=None):
+ """
+ Set the dag run for a specific execution date and its task instances
+ to acknowledged.
+
+ :param dag: the DAG of which to alter state
+ :param execution_date: the execution date from which to start looking
+ :param commit: commit DAG and tasks to be altered to the database
+ :param session: database session
+ :return: If commit is true, list of tasks that have been updated,
+ otherwise list of tasks that will be updated
+ :raises: ValueError if dag or execution_date is invalid
+ """
+ if not dag or not execution_date:
+ return []
+
+ # Mark the dag run to success.
+ if commit:
+ _set_dag_run_state(dag.dag_id, execution_date, State.ACKNOWLEDGED, session)
+
+ # Mark all failed task instances of the dag run to acknowledged.
+ for task in dag.tasks:
+ task.dag = dag
+ return set_state(
+ tasks=dag.tasks, execution_date=execution_date, state=State.ACKNOWLEDGED, commit=commit, session=session, failed=True
+ )
+
+
@provide_session
def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
"""
--- airflow/utils/state.py.orig 2021-11-17 16:31:35.240667990 +0100
+++ airflow/utils/state.py 2021-12-07 15:30:50.023955812 +0100
@@ -48,6 +48,7 @@
SKIPPED = "skipped" # Skipped by branching or some other mechanism
SENSING = "sensing" # Smart sensor offloaded to the sensor DAG
DEFERRED = "deferred" # Deferrable operator waiting on a trigger
+ ACKNOWLEDGED = "acknowledged" # task may be failed, but it's okay, it's state should not make noise
def __str__(self) -> str: # pylint: disable=invalid-str-returned
return self.value
@@ -66,6 +67,7 @@
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
+ ACKNOWLEDGED = "acknowledged"
def __str__(self) -> str:
return self.value
@@ -96,6 +98,7 @@
SKIPPED = TaskInstanceState.SKIPPED
SENSING = TaskInstanceState.SENSING
DEFERRED = TaskInstanceState.DEFERRED
+ ACKNOWLEDGED = TaskInstanceState.ACKNOWLEDGED
task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)
@@ -104,6 +107,7 @@
DagRunState.SUCCESS,
DagRunState.RUNNING,
DagRunState.FAILED,
+ DagRunState.ACKNOWLEDGED,
)
state_color: Dict[Optional[TaskInstanceState], str] = {
@@ -121,6 +125,7 @@
TaskInstanceState.REMOVED: 'lightgrey',
TaskInstanceState.SCHEDULED: 'tan',
TaskInstanceState.DEFERRED: 'mediumpurple',
+ TaskInstanceState.ACKNOWLEDGED: 'sienna', # neither green, neither red --> red+green = brown
}
state_color[TaskInstanceState.SENSING] = state_color[TaskInstanceState.DEFERRED]
state_color.update(STATE_COLORS) # type: ignore
@@ -151,6 +156,7 @@
TaskInstanceState.FAILED,
TaskInstanceState.SKIPPED,
TaskInstanceState.UPSTREAM_FAILED,
+ TaskInstanceState.ACKNOWLEDGED,
]
)
"""
--- airflow/www/forms.py.orig 2021-12-07 15:11:39.133274432 +0100
+++ airflow/www/forms.py 2021-12-07 15:10:42.140438988 +0100
@@ -162,6 +162,7 @@
('running', 'running'),
('failed', 'failed'),
('up_for_retry', 'up_for_retry'),
+ ('acknowledged', 'acknowledged'),
),
widget=Select2Widget(),
validators=[InputRequired()],
--- airflow/www/views.py.orig 2021-12-07 15:48:39.388866769 +0100
+++ airflow/www/views.py 2021-12-07 17:32:38.806764466 +0100
@@ -93,6 +93,7 @@
from airflow.api.common.experimental.mark_tasks import (
set_dag_run_state_to_failed,
set_dag_run_state_to_success,
+ set_dag_run_state_to_acknowledged,
)
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.exceptions import AirflowException
@@ -4024,6 +4025,33 @@
flash('Failed to set state', 'error')
return redirect(self.get_default_url())
+ @action(
+ 'set_acknowledged',
+ "Set state to 'acknowledged'",
+ "All failed task instances would also be marked as acknowledged, are you sure?",
+ single=False,
+ )
+ @action_has_dag_edit_access
+ @provide_session
+ def action_set_acknowledged(self, drs, session=None):
+ """Set state to acknowledged."""
+ try:
+ count = 0
+ altered_tis = []
+ for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all():
+ count += 1
+ altered_tis += set_dag_run_state_to_acknowledged(
+ current_app.dag_bag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session
+ )
+ altered_ti_count = len(altered_tis)
+ flash(
+ "{count} dag runs and {altered_ti_count} task instances "
+ "were set to acknowledged".format(count=count, altered_ti_count=altered_ti_count)
+ )
+ except Exception as e:
+ flash(f'Failed to set state: {e}', 'error')
+ return redirect(self.get_default_url())
+
@action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
@action_has_dag_edit_access
@provide_session
@@ -4384,6 +4412,14 @@
self.update_redirect()
return redirect(self.get_redirect())
+ @action('set_acknowledged', "Set state to 'acknowledged'", '', single=False)
+ @action_has_dag_edit_access
+ def action_set_acknowledged(self, tis):
+ """Set state to 'acknowledged'"""
+ self.set_task_instance_state(tis, State.ACKNOWLEDGED)
+ self.update_redirect()
+ return redirect(self.get_redirect())
+
class AutocompleteView(AirflowBaseView):
"""View to provide autocomplete results"""
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org