You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Qian Yu (Jira)" <ji...@apache.org> on 2020/01/07 06:46:00 UTC
[jira] [Created] (AIRFLOW-6497) Scheduler creates DagBag in the
same process with outdated info
Qian Yu created AIRFLOW-6497:
--------------------------------
Summary: Scheduler creates DagBag in the same process with outdated info
Key: AIRFLOW-6497
URL: https://issues.apache.org/jira/browse/AIRFLOW-6497
Project: Apache Airflow
Issue Type: Bug
Components: scheduler
Affects Versions: 1.10.7
Reporter: Qian Yu
The following code in scheduler_job.py seems to be called in the same process as the scheduler. It creates a DagBag. But since scheduler is a long running process, it does not pick up the latest changes made to DAGs. For example, changes to retries count, on_failure_callback, newly added tasks, etc are not reflected.
{code:python}
if ti.try_number == try_number and ti.state == State.QUEUED:
msg = ("Executor reports task instance {} finished ({}) "
"although the task says its {}. Was the task "
"killed externally?".format(ti, state, ti.state))
Stats.incr('scheduler.tasks.killed_externally')
self.log.error(msg)
try:
simple_dag = simple_dag_bag.get_dag(dag_id)
dagbag = models.DagBag(simple_dag.full_filepath)
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(task_id)
ti.handle_failure(msg)
except Exception:
self.log.error("Cannot load the dag bag to handle failure for %s"
". Setting task to FAILED without callbacks or "
"retries. Do you have enough resources?", ti)
ti.state = State.FAILED
session.merge(ti)
session.commit()
{code}
This causes errors such as AttributeError due to stale code being hit. E.g. when someone added a .join attribute to CustomOperator without bouncing the scheduler, this is what he would get after a CeleryWorker timeout error causes this line to be hit:
{code}
[2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: /dags/dag1.py
Traceback (most recent call last):
File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/lib/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 684, in _load
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/dags/dag1.py", line 280, in <module>
task1 >> task2.join
AttributeError: 'CustomOperator' object has no attribute 'join'
[2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag bag to handle failure for <TaskInstance: dag1.task1 2020-01-02 00:00:00+00:00 [queued]>. Setting task to FAILED without callbacks or retries. Do you have enough resou
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)