You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/26 13:32:59 UTC

[airflow] branch master updated: Only compare updated time when Serialized DAG exists (#13899)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8958d12  Only compare updated time when Serialized DAG exists (#13899)
8958d12 is described below

commit 8958d125cd4ac9e58d706d75be3eb88d591199cd
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Tue Jan 26 13:32:33 2021 +0000

    Only compare updated time when Serialized DAG exists (#13899)
    
    closes https://github.com/apache/airflow/issues/13667
    
    The following error happens when Serialized DAGs exist in Webserver or Scheduler but it has just been removed from serialized_dag table,
    mainly due to the removal of DAG file.
    
    ```
    Traceback (most recent call last):
      File "/home/app/.pyenv/versions/3.8.1/envs/airflow-py381/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
        self._run_scheduler_loop()
      File "/home/app/.pyenv/versions/3.8.1/envs/airflow-py381/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
        num_queued_tis = self._do_scheduling(session)
      File "/home/app/.pyenv/versions/3.8.1/envs/airflow-py381/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1516, in _do_scheduling
        self._schedule_dag_run(dag_run, active_runs_by_dag_id.get(dag_run.dag_id, set()), session)
      File "/home/app/.pyenv/versions/3.8.1/envs/airflow-py381/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1629, in _schedule_dag_run
        dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
      File "/home/app/.pyenv/versions/3.8.1/envs/airflow-py381/lib/python3.8/site-packages/airflow/utils/session.py", line 62, in wrapper
        return func(*args, **kwargs)
      File "/home/app/.pyenv/versions/3.8.1/envs/airflow-py381/lib/python3.8/site-packages/airflow/models/dagbag.py", line 187, in get_dag
        if sd_last_updated_datetime > self.dags_last_fetched[dag_id]
    ```
    
    A simple fix is to just check if `sd_last_updated_datetime` is not `None` i.e. Serialized DAG for that dag_id is not None
---
 airflow/models/dagbag.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 2637255..f1ae55c 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -184,7 +184,7 @@ class DagBag(LoggingMixin):
                     dag_id=dag_id,
                     session=session,
                 )
-                if sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
+                if sd_last_updated_datetime and sd_last_updated_datetime > self.dags_last_fetched[dag_id]:
                     self._add_dag_from_db(dag_id=dag_id, session=session)
 
             return self.dags.get(dag_id)