You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/03/06 19:49:00 UTC

[jira] [Commented] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info

    [ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17053728#comment-17053728 ] 

ASF GitHub Bot commented on AIRFLOW-6497:
-----------------------------------------

mik-laj commented on pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Scheduler creates DagBag in the same process with outdated info
> ---------------------------------------------------------------
>
>                 Key: AIRFLOW-6497
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6497
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.7
>            Reporter: Qian Yu
>            Priority: Major
>
> The following code in scheduler_job.py seems to be called in the same process as the scheduler. It creates a DagBag. But since scheduler is a long running process, it does not pick up the latest changes made to DAGs. For example, changes to retries count, on_failure_callback, newly added tasks, etc are not reflected.
>  
> {code:python}
>                 if ti.try_number == try_number and ti.state == State.QUEUED:
>                     msg = ("Executor reports task instance {} finished ({}) "
>                            "although the task says its {}. Was the task "
>                            "killed externally?".format(ti, state, ti.state))
>                     Stats.incr('scheduler.tasks.killed_externally')
>                     self.log.error(msg)
>                     try:
>                         simple_dag = simple_dag_bag.get_dag(dag_id)
>                         dagbag = models.DagBag(simple_dag.full_filepath)
>                         dag = dagbag.get_dag(dag_id)
>                         ti.task = dag.get_task(task_id)
>                         ti.handle_failure(msg)
>                     except Exception:
>                         self.log.error("Cannot load the dag bag to handle failure for %s"
>                                        ". Setting task to FAILED without callbacks or "
>                                        "retries. Do you have enough resources?", ti)
>                         ti.state = State.FAILED
>                         session.merge(ti)
>                         session.commit()
> {code}
> This causes errors such as AttributeError due to stale code being hit. E.g. when someone added a .join attribute to CustomOperator without bouncing the scheduler, this is what he would get after a CeleryWorker timeout error causes this line to be hit:
> {code}
> [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: /dags/dag1.py
> Traceback (most recent call last):
>   File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in process_file
>     m = imp.load_source(mod_name, filepath)
>   File "/usr/lib/python3.6/imp.py", line 172, in load_source
>     module = _load(spec)
>   File "<frozen importlib._bootstrap>", line 684, in _load
>   File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
>   File "<frozen importlib._bootstrap_external>", line 678, in exec_module
>   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
>   File "/dags/dag1.py", line 280, in <module>
>     task1 >> task2.join
> AttributeError: 'CustomOperator' object has no attribute 'join'
> [2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag bag to handle failure for <TaskInstance: dag1.task1 2020-01-02 00:00:00+00:00 [queued]>. Setting task to FAILED without callbacks or retries. Do you have enough resou
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)