You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/03 09:38:13 UTC

[GitHub] [airflow] tothandor edited a comment on issue #12199: New dag run / task state: disposed

tothandor edited a comment on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-1003969950


   I have already made a quick'n'dirty patch for that, that introduces an "acknowledge" state, to be able to track the errors already handled, and the ones left to be done. If someone is familiar with monitoring systems (e.g. CheckMK, Nagios, Zabbix, Zenoss), then this won't be new for him/her. Each error should either be dismissed or fixed within a time. In some sense Airflow is also a monitoring system.
   The color I chose for acknowledged state is sienna, which is between green (good, ok), and red (bad, failure, error), but it's still looks rather dull red, according to my perception, to mark a healed problem, which still shows its presence. Later on this could help to identify a returning problems, which would requires more inspection.
   
   ```python
   --- airflow/settings.py.orig	2021-12-07 15:49:11.960772472 +0100
   +++ airflow/settings.py	2021-12-07 15:30:21.792037118 +0100
   @@ -94,6 +94,7 @@
        "skipped": "pink",
        "scheduled": "tan",
        "deferred": "mediumpurple",
   +    "acknowledged": "sienna",
    }
    
    
   --- airflow/api/common/experimental/mark_tasks.py.orig	2021-11-17 16:31:35.044668390 +0100
   +++ airflow/api/common/experimental/mark_tasks.py	2021-12-07 17:05:20.931619345 +0100
   @@ -71,6 +71,7 @@
        state: str = State.SUCCESS,
        commit: bool = False,
        session=None,
   +    failed: bool = False,
    ):
        """
        Set the state of a task instance and if needed its relatives. Can set state
   @@ -89,6 +90,7 @@
        :param state: State to which the tasks need to be set
        :param commit: Commit tasks to be altered to the database
        :param session: database session
   +    :param failed: Affect only failed task instances
        :return: list of tasks that have been created and updated
        """
        if not tasks:
   @@ -121,6 +123,8 @@
            if sub_dag_run_ids:
                qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
                tis_altered += qry_sub_dag.with_for_update().all()
   +        if failed:
   +            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
            for task_instance in tis_altered:
                task_instance.state = state
                if state in State.finished:
   @@ -131,6 +135,9 @@
            if sub_dag_run_ids:
                qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
                tis_altered += qry_sub_dag.all()
   +        if failed:
   +            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
   +
        return tis_altered
    
    
   @@ -316,6 +323,35 @@
        )
    
    
   +@provide_session
   +def set_dag_run_state_to_acknowledged(dag, execution_date, commit=False, session=None):
   +    """
   +    Set the dag run for a specific execution date and its task instances
   +    to acknowledged.
   +
   +    :param dag: the DAG of which to alter state
   +    :param execution_date: the execution date from which to start looking
   +    :param commit: commit DAG and tasks to be altered to the database
   +    :param session: database session
   +    :return: If commit is true, list of tasks that have been updated,
   +             otherwise list of tasks that will be updated
   +    :raises: ValueError if dag or execution_date is invalid
   +    """
   +    if not dag or not execution_date:
   +        return []
   +
   +    # Mark the dag run to success.
   +    if commit:
   +        _set_dag_run_state(dag.dag_id, execution_date, State.ACKNOWLEDGED, session)
   +
   +    # Mark all failed task instances of the dag run to acknowledged.
   +    for task in dag.tasks:
   +        task.dag = dag
   +    return set_state(
   +        tasks=dag.tasks, execution_date=execution_date, state=State.ACKNOWLEDGED, commit=commit, session=session, failed=True
   +    )
   +
   +
    @provide_session
    def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
        """
   --- airflow/utils/state.py.orig	2021-11-17 16:31:35.240667990 +0100
   +++ airflow/utils/state.py	2021-12-07 15:30:50.023955812 +0100
   @@ -48,6 +48,7 @@
        SKIPPED = "skipped"  # Skipped by branching or some other mechanism
        SENSING = "sensing"  # Smart sensor offloaded to the sensor DAG
        DEFERRED = "deferred"  # Deferrable operator waiting on a trigger
   +    ACKNOWLEDGED = "acknowledged" # task may be failed, but it's okay, it's state should not make noise
    
        def __str__(self) -> str:  # pylint: disable=invalid-str-returned
            return self.value
   @@ -66,6 +67,7 @@
        RUNNING = "running"
        SUCCESS = "success"
        FAILED = "failed"
   +    ACKNOWLEDGED = "acknowledged"
    
        def __str__(self) -> str:
            return self.value
   @@ -96,6 +98,7 @@
        SKIPPED = TaskInstanceState.SKIPPED
        SENSING = TaskInstanceState.SENSING
        DEFERRED = TaskInstanceState.DEFERRED
   +    ACKNOWLEDGED = TaskInstanceState.ACKNOWLEDGED
    
        task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)
    
   @@ -104,6 +107,7 @@
            DagRunState.SUCCESS,
            DagRunState.RUNNING,
            DagRunState.FAILED,
   +        DagRunState.ACKNOWLEDGED,
        )
    
        state_color: Dict[Optional[TaskInstanceState], str] = {
   @@ -121,6 +125,7 @@
            TaskInstanceState.REMOVED: 'lightgrey',
            TaskInstanceState.SCHEDULED: 'tan',
            TaskInstanceState.DEFERRED: 'mediumpurple',
   +        TaskInstanceState.ACKNOWLEDGED: 'sienna', # neither green, neither red --> red+green = brown
        }
        state_color[TaskInstanceState.SENSING] = state_color[TaskInstanceState.DEFERRED]
        state_color.update(STATE_COLORS)  # type: ignore
   @@ -151,6 +156,7 @@
                TaskInstanceState.FAILED,
                TaskInstanceState.SKIPPED,
                TaskInstanceState.UPSTREAM_FAILED,
   +            TaskInstanceState.ACKNOWLEDGED,
            ]
        )
        """
   --- airflow/www/forms.py.orig	2021-12-07 15:11:39.133274432 +0100
   +++ airflow/www/forms.py	2021-12-07 15:10:42.140438988 +0100
   @@ -162,6 +162,7 @@
                ('running', 'running'),
                ('failed', 'failed'),
                ('up_for_retry', 'up_for_retry'),
   +            ('acknowledged', 'acknowledged'),
            ),
            widget=Select2Widget(),
            validators=[InputRequired()],
   --- airflow/www/views.py.orig	2021-12-07 15:48:39.388866769 +0100
   +++ airflow/www/views.py	2021-12-07 17:32:38.806764466 +0100
   @@ -93,6 +93,7 @@
    from airflow.api.common.experimental.mark_tasks import (
        set_dag_run_state_to_failed,
        set_dag_run_state_to_success,
   +    set_dag_run_state_to_acknowledged,
    )
    from airflow.configuration import AIRFLOW_CONFIG, conf
    from airflow.exceptions import AirflowException
   @@ -4024,6 +4025,33 @@
                flash('Failed to set state', 'error')
            return redirect(self.get_default_url())
    
   +    @action(
   +        'set_acknowledged',
   +        "Set state to 'acknowledged'",
   +        "All failed task instances would also be marked as acknowledged, are you sure?",
   +        single=False,
   +    )
   +    @action_has_dag_edit_access
   +    @provide_session
   +    def action_set_acknowledged(self, drs, session=None):
   +        """Set state to acknowledged."""
   +        try:
   +            count = 0
   +            altered_tis = []
   +            for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all():
   +                count += 1
   +                altered_tis += set_dag_run_state_to_acknowledged(
   +                    current_app.dag_bag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session
   +                )
   +            altered_ti_count = len(altered_tis)
   +            flash(
   +                "{count} dag runs and {altered_ti_count} task instances "
   +                "were set to acknowledged".format(count=count, altered_ti_count=altered_ti_count)
   +            )
   +        except Exception as e:
   +            flash(f'Failed to set state: {e}', 'error')
   +        return redirect(self.get_default_url())
   +
        @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
        @action_has_dag_edit_access
        @provide_session
   @@ -4384,6 +4412,14 @@
            self.update_redirect()
            return redirect(self.get_redirect())
    
   +    @action('set_acknowledged', "Set state to 'acknowledged'", '', single=False)
   +    @action_has_dag_edit_access
   +    def action_set_acknowledged(self, tis):
   +        """Set state to 'acknowledged'"""
   +        self.set_task_instance_state(tis, State.ACKNOWLEDGED)
   +        self.update_redirect()
   +        return redirect(self.get_redirect())
   +
    
    class AutocompleteView(AirflowBaseView):
        """View to provide autocomplete results"""
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org