You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Qian Yu (Jira)" <ji...@apache.org> on 2020/01/07 06:49:00 UTC

[jira] [Commented] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info

    [ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009401#comment-17009401 ] 

Qian Yu commented on AIRFLOW-6497:
----------------------------------

[~bolke] [~potiuk] [~kamil.bregula] any idea / suggestions?

> Scheduler creates DagBag in the same process with outdated info
> ---------------------------------------------------------------
>
>                 Key: AIRFLOW-6497
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6497
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.7
>            Reporter: Qian Yu
>            Priority: Major
>
> The following code in scheduler_job.py seems to be called in the same process as the scheduler. It creates a DagBag. But since scheduler is a long running process, it does not pick up the latest changes made to DAGs. For example, changes to retries count, on_failure_callback, newly added tasks, etc are not reflected.
>  
> {code:python}
>                 if ti.try_number == try_number and ti.state == State.QUEUED:
>                     msg = ("Executor reports task instance {} finished ({}) "
>                            "although the task says its {}. Was the task "
>                            "killed externally?".format(ti, state, ti.state))
>                     Stats.incr('scheduler.tasks.killed_externally')
>                     self.log.error(msg)
>                     try:
>                         simple_dag = simple_dag_bag.get_dag(dag_id)
>                         dagbag = models.DagBag(simple_dag.full_filepath)
>                         dag = dagbag.get_dag(dag_id)
>                         ti.task = dag.get_task(task_id)
>                         ti.handle_failure(msg)
>                     except Exception:
>                         self.log.error("Cannot load the dag bag to handle failure for %s"
>                                        ". Setting task to FAILED without callbacks or "
>                                        "retries. Do you have enough resources?", ti)
>                         ti.state = State.FAILED
>                         session.merge(ti)
>                         session.commit()
> {code}
> This causes errors such as AttributeError due to stale code being hit. E.g. when someone added a .join attribute to CustomOperator without bouncing the scheduler, this is what he would get after a CeleryWorker timeout error causes this line to be hit:
> {code}
> [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: /dags/dag1.py
> Traceback (most recent call last):
>   File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in process_file
>     m = imp.load_source(mod_name, filepath)
>   File "/usr/lib/python3.6/imp.py", line 172, in load_source
>     module = _load(spec)
>   File "<frozen importlib._bootstrap>", line 684, in _load
>   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
>   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
>   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
>   File "/dags/dag1.py", line 280, in <module>
>     task1 >> task2.join
> AttributeError: 'CustomOperator' object has no attribute 'join'
> [2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag bag to handle failure for <TaskInstance: dag1.task1 2020-01-02 00:00:00+00:00 [queued]>. Setting task to FAILED without callbacks or retries. Do you have enough resou
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)