You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/30 12:03:19 UTC

[airflow] branch master updated: Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new de277c6  Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963)
de277c6 is described below

commit de277c69e7909cf0d563bbd542166397523ebbe0
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Jan 30 12:02:53 2021 +0000

    Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963)
    
    closes https://github.com/apache/airflow/issues/13434
---
 airflow/models/dag.py            |  1 -
 tests/jobs/test_scheduler_job.py | 63 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 17b7243..4a0f5f8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1846,7 +1846,6 @@ class DAG(LoggingMixin):
                 or_(
                     DagRun.run_type == DagRunType.BACKFILL_JOB,
                     DagRun.run_type == DagRunType.SCHEDULED,
-                    DagRun.external_trigger.is_(True),
                 ),
             )
             .group_by(DagRun.dag_id)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6b63806..595da25 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3595,6 +3595,69 @@ class TestSchedulerJob(unittest.TestCase):
                 "'test_scheduler_create_dag_runs_does_not_raise_error' not found in serialized_dag table"
             ) in log_output.output[0]
 
+    def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self):
+        """
+        Test that externally triggered Dag Runs should not affect (by skipping) next
+        scheduled DAG runs
+        """
+        dag = DAG(
+            dag_id='test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run',
+            start_date=DEFAULT_DATE,
+            schedule_interval="*/1 * * * *",
+            max_active_runs=5,
+            catchup=True,
+        )
+
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+        dag.clear()
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag, root_dag=dag)
+        # Write to dag and serialized_dag table
+        dagbag.sync_to_db(session)
+        dag = dagbag.get_dag(dag.dag_id)
+
+        # Verify that dag_model.next_dagrun is equal to next execution_date
+        dag_model = session.query(DagModel).get(dag.dag_id)
+        assert dag_model.next_dagrun == DEFAULT_DATE
+
+        job = SchedulerJob(subdir=os.devnull)
+        job.executor = MockExecutor(do_update=False)
+        job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+        # Verify a DagRun is created with the correct execution_date
+        # when Scheduler._do_scheduling is run in the Scheduler Loop
+        job._do_scheduling(session)
+        dr1 = dag.get_dagrun(DEFAULT_DATE, session)
+        assert dr1 is not None
+        assert dr1.state == State.RUNNING
+
+        # Verify that dag_model.next_dagrun is set to next execution_date
+        dag_model = session.query(DagModel).get(dag.dag_id)
+        assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+
+        # Trigger the Dag externally
+        dr = dag.create_dagrun(
+            state=State.RUNNING,
+            execution_date=timezone.utcnow(),
+            run_type=DagRunType.MANUAL,
+            session=session,
+            external_trigger=True,
+        )
+        assert dr is not None
+        # Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file
+        DAG.bulk_write_to_db([dag], session)
+
+        # Test that 'dag_model.next_dagrun' has not been changed because of newly created external
+        # triggered DagRun.
+        dag_model = session.query(DagModel).get(dag.dag_id)
+        assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+
     def test_do_schedule_max_active_runs_upstream_failed(self):
         """
         Test that tasks in upstream failed don't count as actively running.