You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2023/02/18 20:46:05 UTC

[airflow] branch main updated: Use dagrun instead of DagFileProcessor in backfill_job test (#28724)

This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d20300018a Use dagrun instead of DagFileProcessor in backfill_job test (#28724)
d20300018a is described below

commit d20300018a38159f5452ae16bc9df90b1e7270e5
Author: Kwon Soonmok <tn...@gmail.com>
AuthorDate: Sun Feb 19 05:45:57 2023 +0900

    Use dagrun instead of DagFileProcessor in backfill_job test (#28724)
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    Co-authored-by: Andrey Anshin <An...@taragol.is>
---
 tests/jobs/test_backfill_job.py | 21 +++++++--------------
 1 file changed, 7 insertions(+), 14 deletions(-)

diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index 614221a4ff..022ca5e8c3 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -153,31 +153,24 @@ class TestBackfillJob:
 
         assert State.SUCCESS == dag_run.state
 
-    @pytest.mark.xfail(condition=True, reason="This test is flaky")
     @pytest.mark.backend("postgres", "mysql")
-    def test_trigger_controller_dag(self):
+    def test_trigger_controller_dag(self, session):
         dag = self.dagbag.get_dag("example_trigger_controller_dag")
         target_dag = self.dagbag.get_dag("example_trigger_target_dag")
         target_dag.sync_to_db()
 
-        # dag_file_processor = DagFileProcessor(dag_ids=[], log=Mock())
-        task_instances_list = []
-        # task_instances_list = dag_file_processor._process_task_instances(
-        #    target_dag,
-        #    dag_runs=DagRun.find(dag_id='example_trigger_target_dag')
-        # )
-        assert not task_instances_list
+        target_dag_run = session.query(DagRun).filter(DagRun.dag_id == target_dag.dag_id).one_or_none()
+        assert target_dag_run is None
 
         job = BackfillJob(
             dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_first_depends_on_past=True
         )
         job.run()
 
-        task_instances_list = []
-        # task_instances_list = dag_file_processor._process_task_instances(
-        #    target_dag,
-        #    dag_runs=DagRun.find(dag_id='example_trigger_target_dag')
-        # )
+        dag_run = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).one_or_none()
+        assert dag_run is not None
+
+        task_instances_list = job._task_instances_for_dag_run(dag=dag, dag_run=dag_run)
 
         assert task_instances_list