You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/19 23:38:37 UTC

[GitHub] [airflow] n-oden commented on issue #14319: tasks with `none_failed` trigger rule do not run when all upstream tasks are skipped

n-oden commented on issue #14319:
URL: https://github.com/apache/airflow/issues/14319#issuecomment-782457157


   As it turns out, this has been fixed between 1.10.12 and HEAD, but has not been released yet.
   
   Whatever is going on here definitely does not happen in all cases.  The simplest possible test case for skip behavior appears to function as expected:
   
   The issue was in https://github.com/apache/airflow/blob/1.10.12/airflow/sensors/base_sensor_operator.py#L112-L114:
   
   ```
                   if self.soft_fail and not context['ti'].is_eligible_to_retry():
                       self._do_skip_downstream_tasks(context)
                       raise AirflowSkipException('Snap. Time is **OUT.')**
   ```
   
   Any sensor that inherited BaseSensorOperator and had `soft_fail` set to `True` would raise `AirflowSkipExcepton` but only after _manually_ skipping _all_ downstream tasks in `_do_skip_downstream_tasks()`:
   
   ```
       def _do_skip_downstream_tasks(self, context):
           downstream_tasks = context['task'].get_flat_relatives(upstream=False)
           self.log.debug("Downstream task_ids %s", downstream_tasks)
           if downstream_tasks:
               self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
   ```
   
   This appears to have been fixed in HEAD, where in https://github.com/apache/airflow/blob/master/airflow/sensors/base.py#L233-L236 just the exception is raised and the `_do_skip_downstream_tasks()` function no longer exists.
   
   And as a side note, if you look at https://github.com/apache/airflow/blob/master/airflow/models/taskinstance.py#L1142-L1157,  the `_run_raw_task()` method is primarily a very large try/except block that attempts to run the task and then handle each possible exception in turn.  But the exception handler for `AirflowSkipException` _does not return_, which means that the flow falls out of the try/except block and continues on L1182 with the task being marked success.  You can actually see this in our log snippet above:
   
   ```
   [2021-02-12 00:32:18,130] {taskinstance.py:1025} INFO - Marking task as SKIPPED.dag_id=pipeline_v1, task_id=check_for_late_data, execution_date=20210211T003000, start_date=20210212T003017, end_date=
   [2021-02-12 00:32:18,130] {taskinstance.py:1070} INFO - Marking task as SUCCESS.dag_id=pipeline_v1, task_id=check_for_late_data, execution_date=20210211T003000, start_date=20210212T003017, end_date=20210212T003218
   ```
   
   I suspect that there should be a `raise` or a `return` at the end of that block?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org