You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/08/17 08:29:32 UTC
[airflow] 04/07: Improve `dag_maker` fixture (#17324)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2239fc07b5f62bf02373f5ef51f22886fb014dcd
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Mon Aug 2 07:37:40 2021 +0100
Improve `dag_maker` fixture (#17324)
This PR improves the dag_maker fixture to enable creation of dagrun, dag and dag_model separately
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
(cherry picked from commit 5c1e09cafacea922b9281e901db7da7cadb3e9be)
---
tests/jobs/test_local_task_job.py | 21 ++++++---------------
1 file changed, 6 insertions(+), 15 deletions(-)
diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 2e28332..c18e6e5 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -836,34 +836,25 @@ class TestLocalTaskJob:
assert retry_callback_called.value == 1
assert task_terminated_externally.value == 1
- def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
+ def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self, dag_maker):
"""Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
- dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
- op1 = PythonOperator(task_id='dummy', dag=dag, owner='airflow', python_callable=lambda: True)
+ with dag_maker(dag_id='test_dags') as dag:
+ op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
session = settings.Session()
- orm_dag = DagModel(
- dag_id=dag.dag_id,
+ dag_maker.make_dagmodel(
has_task_concurrency_limits=False,
- next_dagrun=dag.start_date,
next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
is_active=True,
is_paused=True,
)
- session.add(orm_dag)
- session.flush()
# Write Dag to DB
dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
dagbag.bag_dag(dag, root_dag=dag)
dagbag.sync_to_db()
- dr = dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- state=State.RUNNING,
- execution_date=DEFAULT_DATE,
- start_date=DEFAULT_DATE,
- session=session,
- )
+ dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
assert dr.state == State.RUNNING
ti = TaskInstance(op1, dr.execution_date)
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())