You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/09 14:02:55 UTC

[GitHub] [airflow] billcrook opened a new issue #12199: New dag run / task state: disposed

billcrook opened a new issue #12199:
URL: https://github.com/apache/airflow/issues/12199


   **Description**
   
   Introduce a new dag run / task state called "disposed". This new state would represent the acknowledgment of a failed run/task that should not be retried. Update the UI and CLI to provide a mechanism for the disposal action.
   
   **Use case / motivation**
   
   In a former gig we had a homegrown job management system. One nice feature for operations was the ability to "dispose" of a failed job. Disposal indicated that we recognized the failure, investigated it and decided the job should not be retried and the failure could be ignored going forward. This removed the failure from our daily operations report which we used to investigate failures.  I find myself yearning for this feature lately. In airflow I have to either re-run the job or mark it as successful for it to leave my daily operations failure report. This could simply be implemented as a new dag/task state and action. We also had a "dispose reason" for tracking purposes - just a notes field for why the operator performed the action. Since we dealt with financial transaction feeds, we needed this. The auditability of a dispose state + notes field would be quite useful.
   
   
   **Related Issues**
   
   None that I see.
   
   
   Tagging @ryw 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Anurag-Shetty commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
Anurag-Shetty commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-923827798


   I was able to  create state called "warning" with below changes.
   1. Created with execption called AirflowWarningException in exceptions.py
   2. Made some changes in utils/state.py(adding new state), models/taskinstance.py, ti_deps/deps/trigger_rule_dep.py.
   3. I raising the exception in my custom bash operator based on type of failure.
   ![image](https://user-images.githubusercontent.com/50450873/134150750-5153228c-82e4-47f9-8dcd-a868a6aba455.png)
   
   
   Let me know your thoughts and any impacts from this approach.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Anurag-Shetty commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
Anurag-Shetty commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-926524666


   if you meant that its error but not failure. pipeline should continue to execute next task acknowledging the failure. then Yes, its can be named as acknowledged more general use.  We could keep different color. 
   Wanted to understand  if there any implications of adding new state?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryw commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
ryw commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-737293617


   @thcidale0808 yes would be great :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thcidale0808 commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
thcidale0808 commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-740060934


   @turbaszek , based on that email thread, I guess this will be implemented by Ace Haidrey teams, so in this case I'll look for another issue to contribute. :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-724032370


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] billcrook commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
billcrook commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-926669588


   > pipeline should continue to execute next task acknowledging the failure.
   
   The original intent of this ticket is to acknowledge that a task has failed. I don't believe the pipeline should continue to execute the next task in the failed dag run.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek edited a comment on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
turbaszek edited a comment on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-737339533


   > I didn't remember that we had a dev thread on the topic.
   > 
   
   No worry, I think it would be good to refresh this thread as the approach proposed there gives even more auditability.
   
   > A concrete implementation would help us resolve any lingering fuzziness around the feature.
   
   I agree, but it's good if the person who works on it knows that the effort is proof of concept, not something that was agreed upon 😄 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryw commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
ryw commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-724073326


   As I mentioned in the Slack thread, I think this is a good step towards improved auditability for Airflow. 
   
   Maybe bundle this together with a few other audit improvements for a 2.X release.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-724087533


   Love the feature (and I've been thinking on and off about this for 2 years. Just never thinking enough to open an issue) -- not sure of the name is the only thing.
   
   "Acknowledged Failure" -- or perhaps to make the task dep checking easier, the state could be left as failure, but somewhere else we record "yes, this failure has been investigated, it's 'okay'".
   
   Anyway, :100:  to the feature idea.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tothandor commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
tothandor commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-1003969950


   I have already made a quick'n'dirty patch for that, that introduces an "acknowledge" state, to be able to track the errors already handled, and the ones left to be done. If someone is familiar with monitoring systems (e.g. CheckMK, Nagios, Zabbix, Zenoss), then this won't be new for him/her. Each error should either be dismissed or fixed within a time. In some sense Airflow is also a monitoring system.
   The color I chose for acknowledged state is sienna, which is between green (good, ok), and red (bad, failure, error), but it's still looks rather dull red, according to my perception, to mark a healed problem, which still marks its presence. Later on this could help to identify a returning problems, which would requires more inspection.
   
   ```python
   --- airflow/settings.py.orig	2021-12-07 15:49:11.960772472 +0100
   +++ airflow/settings.py	2021-12-07 15:30:21.792037118 +0100
   @@ -94,6 +94,7 @@
        "skipped": "pink",
        "scheduled": "tan",
        "deferred": "mediumpurple",
   +    "acknowledged": "sienna",
    }
    
    
   --- airflow/api/common/experimental/mark_tasks.py.orig	2021-11-17 16:31:35.044668390 +0100
   +++ airflow/api/common/experimental/mark_tasks.py	2021-12-07 17:05:20.931619345 +0100
   @@ -71,6 +71,7 @@
        state: str = State.SUCCESS,
        commit: bool = False,
        session=None,
   +    failed: bool = False,
    ):
        """
        Set the state of a task instance and if needed its relatives. Can set state
   @@ -89,6 +90,7 @@
        :param state: State to which the tasks need to be set
        :param commit: Commit tasks to be altered to the database
        :param session: database session
   +    :param failed: Affect only failed task instances
        :return: list of tasks that have been created and updated
        """
        if not tasks:
   @@ -121,6 +123,8 @@
            if sub_dag_run_ids:
                qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
                tis_altered += qry_sub_dag.with_for_update().all()
   +        if failed:
   +            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
            for task_instance in tis_altered:
                task_instance.state = state
                if state in State.finished:
   @@ -131,6 +135,9 @@
            if sub_dag_run_ids:
                qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
                tis_altered += qry_sub_dag.all()
   +        if failed:
   +            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
   +
        return tis_altered
    
    
   @@ -316,6 +323,35 @@
        )
    
    
   +@provide_session
   +def set_dag_run_state_to_acknowledged(dag, execution_date, commit=False, session=None):
   +    """
   +    Set the dag run for a specific execution date and its task instances
   +    to acknowledged.
   +
   +    :param dag: the DAG of which to alter state
   +    :param execution_date: the execution date from which to start looking
   +    :param commit: commit DAG and tasks to be altered to the database
   +    :param session: database session
   +    :return: If commit is true, list of tasks that have been updated,
   +             otherwise list of tasks that will be updated
   +    :raises: ValueError if dag or execution_date is invalid
   +    """
   +    if not dag or not execution_date:
   +        return []
   +
   +    # Mark the dag run to success.
   +    if commit:
   +        _set_dag_run_state(dag.dag_id, execution_date, State.ACKNOWLEDGED, session)
   +
   +    # Mark all failed task instances of the dag run to acknowledged.
   +    for task in dag.tasks:
   +        task.dag = dag
   +    return set_state(
   +        tasks=dag.tasks, execution_date=execution_date, state=State.ACKNOWLEDGED, commit=commit, session=session, failed=True
   +    )
   +
   +
    @provide_session
    def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
        """
   --- airflow/utils/state.py.orig	2021-11-17 16:31:35.240667990 +0100
   +++ airflow/utils/state.py	2021-12-07 15:30:50.023955812 +0100
   @@ -48,6 +48,7 @@
        SKIPPED = "skipped"  # Skipped by branching or some other mechanism
        SENSING = "sensing"  # Smart sensor offloaded to the sensor DAG
        DEFERRED = "deferred"  # Deferrable operator waiting on a trigger
   +    ACKNOWLEDGED = "acknowledged" # task may be failed, but it's okay, it's state should not make noise
    
        def __str__(self) -> str:  # pylint: disable=invalid-str-returned
            return self.value
   @@ -66,6 +67,7 @@
        RUNNING = "running"
        SUCCESS = "success"
        FAILED = "failed"
   +    ACKNOWLEDGED = "acknowledged"
    
        def __str__(self) -> str:
            return self.value
   @@ -96,6 +98,7 @@
        SKIPPED = TaskInstanceState.SKIPPED
        SENSING = TaskInstanceState.SENSING
        DEFERRED = TaskInstanceState.DEFERRED
   +    ACKNOWLEDGED = TaskInstanceState.ACKNOWLEDGED
    
        task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)
    
   @@ -104,6 +107,7 @@
            DagRunState.SUCCESS,
            DagRunState.RUNNING,
            DagRunState.FAILED,
   +        DagRunState.ACKNOWLEDGED,
        )
    
        state_color: Dict[Optional[TaskInstanceState], str] = {
   @@ -121,6 +125,7 @@
            TaskInstanceState.REMOVED: 'lightgrey',
            TaskInstanceState.SCHEDULED: 'tan',
            TaskInstanceState.DEFERRED: 'mediumpurple',
   +        TaskInstanceState.ACKNOWLEDGED: 'sienna', # neither green, neither red --> red+green = brown
        }
        state_color[TaskInstanceState.SENSING] = state_color[TaskInstanceState.DEFERRED]
        state_color.update(STATE_COLORS)  # type: ignore
   @@ -151,6 +156,7 @@
                TaskInstanceState.FAILED,
                TaskInstanceState.SKIPPED,
                TaskInstanceState.UPSTREAM_FAILED,
   +            TaskInstanceState.ACKNOWLEDGED,
            ]
        )
        """
   --- airflow/www/forms.py.orig	2021-12-07 15:11:39.133274432 +0100
   +++ airflow/www/forms.py	2021-12-07 15:10:42.140438988 +0100
   @@ -162,6 +162,7 @@
                ('running', 'running'),
                ('failed', 'failed'),
                ('up_for_retry', 'up_for_retry'),
   +            ('acknowledged', 'acknowledged'),
            ),
            widget=Select2Widget(),
            validators=[InputRequired()],
   --- airflow/www/views.py.orig	2021-12-07 15:48:39.388866769 +0100
   +++ airflow/www/views.py	2021-12-07 17:32:38.806764466 +0100
   @@ -93,6 +93,7 @@
    from airflow.api.common.experimental.mark_tasks import (
        set_dag_run_state_to_failed,
        set_dag_run_state_to_success,
   +    set_dag_run_state_to_acknowledged,
    )
    from airflow.configuration import AIRFLOW_CONFIG, conf
    from airflow.exceptions import AirflowException
   @@ -4024,6 +4025,33 @@
                flash('Failed to set state', 'error')
            return redirect(self.get_default_url())
    
   +    @action(
   +        'set_acknowledged',
   +        "Set state to 'acknowledged'",
   +        "All failed task instances would also be marked as acknowledged, are you sure?",
   +        single=False,
   +    )
   +    @action_has_dag_edit_access
   +    @provide_session
   +    def action_set_acknowledged(self, drs, session=None):
   +        """Set state to acknowledged."""
   +        try:
   +            count = 0
   +            altered_tis = []
   +            for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all():
   +                count += 1
   +                altered_tis += set_dag_run_state_to_acknowledged(
   +                    current_app.dag_bag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session
   +                )
   +            altered_ti_count = len(altered_tis)
   +            flash(
   +                "{count} dag runs and {altered_ti_count} task instances "
   +                "were set to acknowledged".format(count=count, altered_ti_count=altered_ti_count)
   +            )
   +        except Exception as e:
   +            flash(f'Failed to set state: {e}', 'error')
   +        return redirect(self.get_default_url())
   +
        @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
        @action_has_dag_edit_access
        @provide_session
   @@ -4384,6 +4412,14 @@
            self.update_redirect()
            return redirect(self.get_redirect())
    
   +    @action('set_acknowledged', "Set state to 'acknowledged'", '', single=False)
   +    @action_has_dag_edit_access
   +    def action_set_acknowledged(self, tis):
   +        """Set state to 'acknowledged'"""
   +        self.set_task_instance_state(tis, State.ACKNOWLEDGED)
   +        self.update_redirect()
   +        return redirect(self.get_redirect())
   +
    
    class AutocompleteView(AirflowBaseView):
        """View to provide autocomplete results"""
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-737339533


   > I didn't remember that we had a dev thread on the topic.
   > 
   > A concrete implementation would help us resolve any lingering fuzziness around the feature.
   
   No worry, I think it would be good to refresh this thread as the approach proposed there gives even more auditability.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbovenzi commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-926483381


   @Anurag-Shetty Nice work!
   Is `warning` the same as `acknowledged`?
   Also, the color seems to be too similar to `upstream_failed`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-740511576


   @thcidale0808 I would suggest you to send a email in this thread so others know that you would be interested in helping make this feature real 😉 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ldacey commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
ldacey commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-913773871


   This would be perfect for tasks where failure is acceptable or predicted. Ideally this would be another state so it would not look red on the main Airflow page (I have been sharing my screen before and people have asked why there are so many failures for certain schedules, and I have to explain that failure is acceptable etc.). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tothandor edited a comment on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
tothandor edited a comment on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-1003969950


   I have already made a quick'n'dirty patch for that, that introduces an "acknowledge" state, to be able to track the errors already handled, and the ones left to be done. If someone is familiar with monitoring systems (e.g. CheckMK, Nagios, Zabbix, Zenoss), then this won't be new for him/her. Each error should either be dismissed or fixed within a time. In some sense Airflow is also a monitoring system.
   The color I chose for acknowledged state is sienna, which is between green (good, ok), and red (bad, failure, error), but it's still looks rather dull red, according to my perception, to mark a healed problem, which still shows its presence. Later on this could help to identify a returning problems, which would requires more inspection.
   
   ```python
   --- airflow/settings.py.orig	2021-12-07 15:49:11.960772472 +0100
   +++ airflow/settings.py	2021-12-07 15:30:21.792037118 +0100
   @@ -94,6 +94,7 @@
        "skipped": "pink",
        "scheduled": "tan",
        "deferred": "mediumpurple",
   +    "acknowledged": "sienna",
    }
    
    
   --- airflow/api/common/experimental/mark_tasks.py.orig	2021-11-17 16:31:35.044668390 +0100
   +++ airflow/api/common/experimental/mark_tasks.py	2021-12-07 17:05:20.931619345 +0100
   @@ -71,6 +71,7 @@
        state: str = State.SUCCESS,
        commit: bool = False,
        session=None,
   +    failed: bool = False,
    ):
        """
        Set the state of a task instance and if needed its relatives. Can set state
   @@ -89,6 +90,7 @@
        :param state: State to which the tasks need to be set
        :param commit: Commit tasks to be altered to the database
        :param session: database session
   +    :param failed: Affect only failed task instances
        :return: list of tasks that have been created and updated
        """
        if not tasks:
   @@ -121,6 +123,8 @@
            if sub_dag_run_ids:
                qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
                tis_altered += qry_sub_dag.with_for_update().all()
   +        if failed:
   +            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
            for task_instance in tis_altered:
                task_instance.state = state
                if state in State.finished:
   @@ -131,6 +135,9 @@
            if sub_dag_run_ids:
                qry_sub_dag = all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates)
                tis_altered += qry_sub_dag.all()
   +        if failed:
   +            tis_altered = [ti for ti in tis_altered if ti.state == State.FAILED]
   +
        return tis_altered
    
    
   @@ -316,6 +323,35 @@
        )
    
    
   +@provide_session
   +def set_dag_run_state_to_acknowledged(dag, execution_date, commit=False, session=None):
   +    """
   +    Set the dag run for a specific execution date and its task instances
   +    to acknowledged.
   +
   +    :param dag: the DAG of which to alter state
   +    :param execution_date: the execution date from which to start looking
   +    :param commit: commit DAG and tasks to be altered to the database
   +    :param session: database session
   +    :return: If commit is true, list of tasks that have been updated,
   +             otherwise list of tasks that will be updated
   +    :raises: ValueError if dag or execution_date is invalid
   +    """
   +    if not dag or not execution_date:
   +        return []
   +
   +    # Mark the dag run to success.
   +    if commit:
   +        _set_dag_run_state(dag.dag_id, execution_date, State.ACKNOWLEDGED, session)
   +
   +    # Mark all failed task instances of the dag run to acknowledged.
   +    for task in dag.tasks:
   +        task.dag = dag
   +    return set_state(
   +        tasks=dag.tasks, execution_date=execution_date, state=State.ACKNOWLEDGED, commit=commit, session=session, failed=True
   +    )
   +
   +
    @provide_session
    def set_dag_run_state_to_failed(dag, execution_date, commit=False, session=None):
        """
   --- airflow/utils/state.py.orig	2021-11-17 16:31:35.240667990 +0100
   +++ airflow/utils/state.py	2021-12-07 15:30:50.023955812 +0100
   @@ -48,6 +48,7 @@
        SKIPPED = "skipped"  # Skipped by branching or some other mechanism
        SENSING = "sensing"  # Smart sensor offloaded to the sensor DAG
        DEFERRED = "deferred"  # Deferrable operator waiting on a trigger
   +    ACKNOWLEDGED = "acknowledged" # task may be failed, but it's okay, it's state should not make noise
    
        def __str__(self) -> str:  # pylint: disable=invalid-str-returned
            return self.value
   @@ -66,6 +67,7 @@
        RUNNING = "running"
        SUCCESS = "success"
        FAILED = "failed"
   +    ACKNOWLEDGED = "acknowledged"
    
        def __str__(self) -> str:
            return self.value
   @@ -96,6 +98,7 @@
        SKIPPED = TaskInstanceState.SKIPPED
        SENSING = TaskInstanceState.SENSING
        DEFERRED = TaskInstanceState.DEFERRED
   +    ACKNOWLEDGED = TaskInstanceState.ACKNOWLEDGED
    
        task_states: Tuple[Optional[TaskInstanceState], ...] = (None,) + tuple(TaskInstanceState)
    
   @@ -104,6 +107,7 @@
            DagRunState.SUCCESS,
            DagRunState.RUNNING,
            DagRunState.FAILED,
   +        DagRunState.ACKNOWLEDGED,
        )
    
        state_color: Dict[Optional[TaskInstanceState], str] = {
   @@ -121,6 +125,7 @@
            TaskInstanceState.REMOVED: 'lightgrey',
            TaskInstanceState.SCHEDULED: 'tan',
            TaskInstanceState.DEFERRED: 'mediumpurple',
   +        TaskInstanceState.ACKNOWLEDGED: 'sienna', # neither green, neither red --> red+green = brown
        }
        state_color[TaskInstanceState.SENSING] = state_color[TaskInstanceState.DEFERRED]
        state_color.update(STATE_COLORS)  # type: ignore
   @@ -151,6 +156,7 @@
                TaskInstanceState.FAILED,
                TaskInstanceState.SKIPPED,
                TaskInstanceState.UPSTREAM_FAILED,
   +            TaskInstanceState.ACKNOWLEDGED,
            ]
        )
        """
   --- airflow/www/forms.py.orig	2021-12-07 15:11:39.133274432 +0100
   +++ airflow/www/forms.py	2021-12-07 15:10:42.140438988 +0100
   @@ -162,6 +162,7 @@
                ('running', 'running'),
                ('failed', 'failed'),
                ('up_for_retry', 'up_for_retry'),
   +            ('acknowledged', 'acknowledged'),
            ),
            widget=Select2Widget(),
            validators=[InputRequired()],
   --- airflow/www/views.py.orig	2021-12-07 15:48:39.388866769 +0100
   +++ airflow/www/views.py	2021-12-07 17:32:38.806764466 +0100
   @@ -93,6 +93,7 @@
    from airflow.api.common.experimental.mark_tasks import (
        set_dag_run_state_to_failed,
        set_dag_run_state_to_success,
   +    set_dag_run_state_to_acknowledged,
    )
    from airflow.configuration import AIRFLOW_CONFIG, conf
    from airflow.exceptions import AirflowException
   @@ -4024,6 +4025,33 @@
                flash('Failed to set state', 'error')
            return redirect(self.get_default_url())
    
   +    @action(
   +        'set_acknowledged',
   +        "Set state to 'acknowledged'",
   +        "All failed task instances would also be marked as acknowledged, are you sure?",
   +        single=False,
   +    )
   +    @action_has_dag_edit_access
   +    @provide_session
   +    def action_set_acknowledged(self, drs, session=None):
   +        """Set state to acknowledged."""
   +        try:
   +            count = 0
   +            altered_tis = []
   +            for dr in session.query(DagRun).filter(DagRun.id.in_([dagrun.id for dagrun in drs])).all():
   +                count += 1
   +                altered_tis += set_dag_run_state_to_acknowledged(
   +                    current_app.dag_bag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session
   +                )
   +            altered_ti_count = len(altered_tis)
   +            flash(
   +                "{count} dag runs and {altered_ti_count} task instances "
   +                "were set to acknowledged".format(count=count, altered_ti_count=altered_ti_count)
   +            )
   +        except Exception as e:
   +            flash(f'Failed to set state: {e}', 'error')
   +        return redirect(self.get_default_url())
   +
        @action('clear', "Clear the state", "All task instances would be cleared, are you sure?", single=False)
        @action_has_dag_edit_access
        @provide_session
   @@ -4384,6 +4412,14 @@
            self.update_redirect()
            return redirect(self.get_redirect())
    
   +    @action('set_acknowledged', "Set state to 'acknowledged'", '', single=False)
   +    @action_has_dag_edit_access
   +    def action_set_acknowledged(self, tis):
   +        """Set state to 'acknowledged'"""
   +        self.set_task_instance_state(tis, State.ACKNOWLEDGED)
   +        self.update_redirect()
   +        return redirect(self.get_redirect())
   +
    
    class AutocompleteView(AirflowBaseView):
        """View to provide autocomplete results"""
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-737334103


   @ryw does implementing this feature means that this discussion reached consensus?
   https://lists.apache.org/thread.html/r14e23de7b71f61e3e81b11eb84da2612938982c46c9824e884d8ba50%40%3Cdev.airflow.apache.org%3E


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryw commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
ryw commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-737336393


   I didn't remember that we had a dev thread on the topic.
   
   A concrete implementation would help us resolve any lingering fuzziness around the feature.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] thcidale0808 commented on issue #12199: New dag run / task state: disposed

Posted by GitBox <gi...@apache.org>.
thcidale0808 commented on issue #12199:
URL: https://github.com/apache/airflow/issues/12199#issuecomment-724707278


   @ashb can I contribute with the implementation of this feature?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org