You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/08/17 08:29:32 UTC

[airflow] 04/07: Improve `dag_maker` fixture (#17324)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2239fc07b5f62bf02373f5ef51f22886fb014dcd
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Aug 2 07:37:40 2021 +0100

    Improve `dag_maker` fixture (#17324)
    
    This PR improves the dag_maker fixture to enable creation of dagrun, dag and dag_model separately
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
    (cherry picked from commit 5c1e09cafacea922b9281e901db7da7cadb3e9be)
---
 tests/jobs/test_local_task_job.py | 21 ++++++---------------
 1 file changed, 6 insertions(+), 15 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 2e28332..c18e6e5 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -836,34 +836,25 @@ class TestLocalTaskJob:
         assert retry_callback_called.value == 1
         assert task_terminated_externally.value == 1
 
-    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+    def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
         """Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
-        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
-        op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+        with dag_maker(dag_id='test_dags') as dag:
+            op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
 
         session = settings.Session()
-        orm_dag = DagModel(
-            dag_id=dag.dag_id,
+        dag_maker.make_dagmodel(
             has_task_concurrency_limits=False,
-            next_dagrun=dag.start_date,
             next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
             is_active=True,
             is_paused=True,
         )
-        session.add(orm_dag)
-        session.flush()
         # Write Dag to DB
         dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
         dagbag.bag_dag(dag, root_dag=dag)
         dagbag.sync_to_db()
 
-        dr = dag.create_dagrun(
-            run_type=DagRunType.SCHEDULED,
-            state=State.RUNNING,
-            execution_date=DEFAULT_DATE,
-            start_date=DEFAULT_DATE,
-            session=session,
-        )
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
         assert dr.state == State.RUNNING
         ti = TaskInstance(op1, dr.execution_date)
         job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())