You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by "colins@thinknear.com" <co...@thinknear.com> on 2017/11/17 01:58:53 UTC

SubDagOperator custom on_retry_callback handling For 1.9.0

Hello Airflow Devs, 

This is a continuation of this mailing thread: 

https://lists.apache.org/thread.html/25f07715b834a0e8b70b9e39fad8b82771fb267c33da484f15e61c3e@%3Cdev.airflow.apache.org%3E

So we were able to figure out why our Subdag Operator task instances would go from FAILED -> UP_FOR_RETRY -> FAILED. 

As you can read from the above posted mailing list, we have custom on_retry_callback function that will set all the task instances (within the Subdag) to be UP_FOR_RETRY. And some other process (unknown to us) would set the state of these task instances back to FAILED. And thus, when the next iteration of the Subdag would run, it could not execute the Subdag tasks since they were set to FAILED. 

Here is what we found:

SubdagOperators have their own DagRun entry, as a result, whenever a Subdag task fails, the DagRun will be set to failed whenever the root task instance (inside the Subdag) is UPSTREAM_FAILED or FAILED.  Since all of our Subdag tasks have 0 retries configured, no runs will continue.  Our Subdag has retries set to 2, so it will call the on_retry_callback and set all the Subdag task instances to be UP_FOR_RETRY.  In the `jobs.py`, there is a function called `_change_state_for_tis_without_dagrun()`, under the SchedulerJob class, that will change the state of the task instances of a corresponding DagRun when the DagRun's state is not set to RUNNING.  The `_change_state_for_tis_without_dagrun()` will then mark the subdag tasks as FAILED afterwards, since the Subdag's DagRun was set to FAILED.

So what we did to combat this was we would also change the state of the Subdag DagRun (in the on_retry_callback handler) to RUNNING to avoid this from happening. Now our retries are running fine. 

We have a couple questions here:

1) This may be an edge case, but this seems somewhat roundabout in terms of how we might manage this case.  The comment in the code states the following:

                # Handle cases where a DAG run state is set (perhaps manually) to
                # a non-running state. Handle task instances that belong to
                # DAG runs in those states

                # If a task instance is up for retry but the corresponding DAG run
                # isn't running, mark the task instance as FAILED so we don't try
                # to re-run it.
                self._change_state_for_tis_without_dagrun(simple_dag_bag,
                                          [State.UP_FOR_RETRY],
                                          State.FAILED)

There appears to be an issue around a SubDag having a DagRun and also being a Task that we encountered b/c of the on_retry_callback.  Does it make more sense for a SubDag DagRun to not be subject to this condition (the condition identified in the comment)?

2) We are not able to figure out why our old on_retry_callback was working fine in our testing environment (which doesn't actively run DAGs unless we're testing) but not our production environment, which constantly runs DAGs.  We replicated our sandbox environment to behave exactly like production and scheduled DAGs to run, but haven't seen the above behavior.  Any thoughts why this might behave differently in some cases?  

Thanks,

Colin