You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Pramiti (JIRA)" <ji...@apache.org> on 2018/09/13 04:05:00 UTC

[jira] [Commented] (AIRFLOW-2229) Scheduler cannot retry abrupt task failures within factory-generated DAGs

    [ https://issues.apache.org/jira/browse/AIRFLOW-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612982#comment-16612982 ] 

Pramiti commented on AIRFLOW-2229:
----------------------------------

We are also facing the same issue. If we change the worker queue, the same dag starts to run. What is wrong with worker ?

> Scheduler cannot retry abrupt task failures within factory-generated DAGs
> -------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2229
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2229
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.9.0
>            Reporter: James Meickle
>            Priority: Major
>
> We had an issue where one of our tasks failed without the worker updating state (unclear why, but let's assume it was an OOM), resulting in this series of error messages:
> {{Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log: [2018-03-20 14:27:04,993] \{{models.py:1595 ERROR - Executor reports task instance %s finished (%s) although the task says its %s. Was the task killed externally?}}}}
> {{ Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log: NoneType}}
> {{ Mar 20 14:27:05 airflow-core-i-0fc1f995414837b8b.stg.int.dynoquant.com airflow_scheduler-stdout.log: [2018-03-20 14:27:04,994] {{jobs.py:1435 ERROR - Cannot load the dag bag to handle failure for <TaskInstance: nightly_dataload.dummy_operator 2018-03-19 00:00:00 [queued]>. Setting task to FAILED without callbacks or retries. Do you have enough resources?}}}}
> Mysterious failures are not unexpected, because we are in the cloud, after all. The concern is the last line: ignoring callbacks and retries, implying that it's a lack of resources. However, the machine was totally underutilized at the time.
> I dug into this code a bit more and as far as I can tell this error is happening in this code path: [https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/jobs.py#L1427]
> {{self.log.error(msg)}}
>  {{try:}}
>  {{    simple_dag = simple_dag_bag.get_dag(dag_id)}}
>  {{    dagbag = models.DagBag(simple_dag.full_filepath)}}
>  {{    dag = dagbag.get_dag(dag_id)}}
>  {{    ti.task = dag.get_task(task_id)}}
>  {{    ti.handle_failure(msg)}}
>  {{except Exception:}}
>  {{    self.log.error("Cannot load the dag bag to handle failure for %s"}}
>  {{    ". Setting task to FAILED without callbacks or "}}
>  {{    "retries. Do you have enough resources?", ti)}}
>  {{    ti.state = State.FAILED}}
>  {{    session.merge(ti)}}
>  {{    session.commit()}}{{}}
> I am not very familiar with this code, nor do I have time to attach a debugger at the moment, but I think what is happening here is:
>  * I have a factory Python file, which imports and instantiates DAG code from other files.
>  * The scheduler loads the DAGs from the factory file on the filesystem. It gets a fileloc (as represented in the DB) not of the factory file, but of the file it loaded code from.
>  * The scheduler makes a simple DAGBag from the instantiated DAGs.
>  * This line of code uses the simple DAG, which references the original DAG object's fileloc, to create a new DAGBag object.
>  * This DAGBag looks for the original DAG in the fileloc, which is the file containing that DAG's _code_, but is not actually importable by Airflow.
>  * An exception is raised trying to load the DAG from the DAGBag, which found nothing.
>  * Handling of the task failure never occurs.
>  * The over-broad Exception code swallows all of the above occurring.
>  * There's just a generic error message that is not helpful to a system operator.
> If this is the case, at minimum, the try/except should be rewritten to be more graceful and to have a better error message. But I question whether this level of DAGBag abstraction/indirection isn't making this failure case worse than it needs to be; under normal conditions the scheduler is definitely able to find the relevant factory-generated DAGs and execute tasks within them as expected, even with the fileloc set to the code path and not the import path.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)