You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Qian Yu (Jira)" <ji...@apache.org> on 2020/01/07 06:46:00 UTC

[jira] [Created] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info

Qian Yu created AIRFLOW-6497:
--------------------------------

             Summary: Scheduler creates DagBag in the same process with outdated info
                 Key: AIRFLOW-6497
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6497
             Project: Apache Airflow
          Issue Type: Bug
          Components: scheduler
    Affects Versions: 1.10.7
            Reporter: Qian Yu


The following code in scheduler_job.py seems to be called in the same process as the scheduler. It creates a DagBag. But since scheduler is a long running process, it does not pick up the latest changes made to DAGs. For example, changes to retries count, on_failure_callback, newly added tasks, etc are not reflected.
 
{code:python}
                if ti.try_number == try_number and ti.state == State.QUEUED:
                    msg = ("Executor reports task instance {} finished ({}) "
                           "although the task says its {}. Was the task "
                           "killed externally?".format(ti, state, ti.state))
                    Stats.incr('scheduler.tasks.killed_externally')
                    self.log.error(msg)
                    try:
                        simple_dag = simple_dag_bag.get_dag(dag_id)
                        dagbag = models.DagBag(simple_dag.full_filepath)
                        dag = dagbag.get_dag(dag_id)
                        ti.task = dag.get_task(task_id)
                        ti.handle_failure(msg)
                    except Exception:
                        self.log.error("Cannot load the dag bag to handle failure for %s"
                                       ". Setting task to FAILED without callbacks or "
                                       "retries. Do you have enough resources?", ti)
                        ti.state = State.FAILED
                        session.merge(ti)
                        session.commit()

{code}


This causes errors such as AttributeError due to stale code being hit. E.g. when someone added a .join attribute to CustomOperator without bouncing the scheduler, this is what he would get after a CeleryWorker timeout error causes this line to be hit:

{code}
[2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: /dags/dag1.py
Traceback (most recent call last):
  File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/lib/python3.6/imp.py", line 172, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 684, in _load
  File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/dags/dag1.py", line 280, in <module>
    task1 >> task2.join
AttributeError: 'CustomOperator' object has no attribute 'join'
[2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag bag to handle failure for <TaskInstance: dag1.task1 2020-01-02 00:00:00+00:00 [queued]>. Setting task to FAILED without callbacks or retries. Do you have enough resou
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)