You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/30 12:03:19 UTC
[airflow] branch master updated: Bugfix: Manual DagRun trigger
should not skip scheduled runs (#13963)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new de277c6 Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963)
de277c6 is described below
commit de277c69e7909cf0d563bbd542166397523ebbe0
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Jan 30 12:02:53 2021 +0000
Bugfix: Manual DagRun trigger should not skip scheduled runs (#13963)
closes https://github.com/apache/airflow/issues/13434
---
airflow/models/dag.py | 1 -
tests/jobs/test_scheduler_job.py | 63 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 63 insertions(+), 1 deletion(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 17b7243..4a0f5f8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1846,7 +1846,6 @@ class DAG(LoggingMixin):
or_(
DagRun.run_type == DagRunType.BACKFILL_JOB,
DagRun.run_type == DagRunType.SCHEDULED,
- DagRun.external_trigger.is_(True),
),
)
.group_by(DagRun.dag_id)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 6b63806..595da25 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3595,6 +3595,69 @@ class TestSchedulerJob(unittest.TestCase):
"'test_scheduler_create_dag_runs_does_not_raise_error' not found in serialized_dag table"
) in log_output.output[0]
+ def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self):
+ """
+ Test that externally triggered Dag Runs should not affect (by skipping) next
+ scheduled DAG runs
+ """
+ dag = DAG(
+ dag_id='test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run',
+ start_date=DEFAULT_DATE,
+ schedule_interval="*/1 * * * *",
+ max_active_runs=5,
+ catchup=True,
+ )
+
+ DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+ session = settings.Session()
+ dag.clear()
+ dagbag = DagBag(
+ dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+ include_examples=False,
+ read_dags_from_db=True,
+ )
+ dagbag.bag_dag(dag=dag, root_dag=dag)
+ # Write to dag and serialized_dag table
+ dagbag.sync_to_db(session)
+ dag = dagbag.get_dag(dag.dag_id)
+
+ # Verify that dag_model.next_dagrun is equal to next execution_date
+ dag_model = session.query(DagModel).get(dag.dag_id)
+ assert dag_model.next_dagrun == DEFAULT_DATE
+
+ job = SchedulerJob(subdir=os.devnull)
+ job.executor = MockExecutor(do_update=False)
+ job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
+
+ # Verify a DagRun is created with the correct execution_date
+ # when Scheduler._do_scheduling is run in the Scheduler Loop
+ job._do_scheduling(session)
+ dr1 = dag.get_dagrun(DEFAULT_DATE, session)
+ assert dr1 is not None
+ assert dr1.state == State.RUNNING
+
+ # Verify that dag_model.next_dagrun is set to next execution_date
+ dag_model = session.query(DagModel).get(dag.dag_id)
+ assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+
+ # Trigger the Dag externally
+ dr = dag.create_dagrun(
+ state=State.RUNNING,
+ execution_date=timezone.utcnow(),
+ run_type=DagRunType.MANUAL,
+ session=session,
+ external_trigger=True,
+ )
+ assert dr is not None
+ # Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file
+ DAG.bulk_write_to_db([dag], session)
+
+ # Test that 'dag_model.next_dagrun' has not been changed because of newly created external
+ # triggered DagRun.
+ dag_model = session.query(DagModel).get(dag.dag_id)
+ assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
+
def test_do_schedule_max_active_runs_upstream_failed(self):
"""
Test that tasks in upstream failed don't count as actively running.