You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2023/02/18 20:46:05 UTC
[airflow] branch main updated: Use dagrun instead of DagFileProcessor in backfill_job test (#28724)
This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d20300018a Use dagrun instead of DagFileProcessor in backfill_job test (#28724)
d20300018a is described below
commit d20300018a38159f5452ae16bc9df90b1e7270e5
Author: Kwon Soonmok <tn...@gmail.com>
AuthorDate: Sun Feb 19 05:45:57 2023 +0900
Use dagrun instead of DagFileProcessor in backfill_job test (#28724)
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
Co-authored-by: Andrey Anshin <An...@taragol.is>
---
tests/jobs/test_backfill_job.py | 21 +++++++--------------
1 file changed, 7 insertions(+), 14 deletions(-)
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 614221a4ff..022ca5e8c3 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -153,31 +153,24 @@ class TestBackfillJob:
assert State.SUCCESS == dag_run.state
- @pytest.mark.xfail(condition=True, reason="This test is flaky")
@pytest.mark.backend("postgres", "mysql")
- def test_trigger_controller_dag(self):
+ def test_trigger_controller_dag(self, session):
dag = self.dagbag.get_dag("example_trigger_controller_dag")
target_dag = self.dagbag.get_dag("example_trigger_target_dag")
target_dag.sync_to_db()
- # dag_file_processor = DagFileProcessor(dag_ids=[], log=Mock())
- task_instances_list = []
- # task_instances_list = dag_file_processor._process_task_instances(
- # target_dag,
- # dag_runs=DagRun.find(dag_id='example_trigger_target_dag')
- # )
- assert not task_instances_list
+ target_dag_run = session.query(DagRun).filter(DagRun.dag_id == target_dag.dag_id).one_or_none()
+ assert target_dag_run is None
job = BackfillJob(
dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_first_depends_on_past=True
)
job.run()
- task_instances_list = []
- # task_instances_list = dag_file_processor._process_task_instances(
- # target_dag,
- # dag_runs=DagRun.find(dag_id='example_trigger_target_dag')
- # )
+ dag_run = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).one_or_none()
+ assert dag_run is not None
+
+ task_instances_list = job._task_instances_for_dag_run(dag=dag, dag_run=dag_run)
assert task_instances_list