You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2021/03/30 07:36:55 UTC
[airflow] branch master updated: Add test to guard how DAG/Operator
params work together (#15075)
This is an automated email from the ASF dual-hosted git repository.
xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new daa3f3c Add test to guard how DAG/Operator params work together (#15075)
daa3f3c is described below
commit daa3f3cb59f8f7d19184e474c26bd21611806c63
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Tue Mar 30 09:36:21 2021 +0200
Add test to guard how DAG/Operator params work together (#15075)
---
tests/core/test_core.py | 37 +++++++++++++++++++++++++++++++++++++
1 file changed, 37 insertions(+)
diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index f6dd6ee..061a1a0 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -421,3 +421,40 @@ class TestCore(unittest.TestCase):
assert context['prev_ds'] == execution_ds
assert context['prev_ds_nodash'] == execution_ds_nodash
+
+ def test_dag_params_and_task_params(self):
+ # This test case guards how params of DAG and Operator work together.
+ # - If any key exists in either DAG's or Operator's params,
+ # it is guaranteed to be available eventually.
+ # - If any key exists in both DAG's params and Operator's params,
+ # the latter has precedence.
+ TI = TaskInstance
+
+ dag = DAG(
+ TEST_DAG_ID,
+ default_args=self.args,
+ schedule_interval=timedelta(weeks=1),
+ start_date=DEFAULT_DATE,
+ params={'key_1': 'value_1', 'key_2': 'value_2_old'},
+ )
+ task1 = DummyOperator(
+ task_id='task1',
+ dag=dag,
+ params={'key_2': 'value_2_new', 'key_3': 'value_3'},
+ )
+ task2 = DummyOperator(task_id='task2', dag=dag)
+ dag.create_dagrun(
+ run_type=DagRunType.SCHEDULED,
+ execution_date=DEFAULT_DATE,
+ state=State.RUNNING,
+ external_trigger=True,
+ )
+ task1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ task2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+ ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+ ti2 = TI(task=task2, execution_date=DEFAULT_DATE)
+ context1 = ti1.get_template_context()
+ context2 = ti2.get_template_context()
+
+ assert context1['params'] == {'key_1': 'value_1', 'key_2': 'value_2_new', 'key_3': 'value_3'}
+ assert context2['params'] == {'key_1': 'value_1', 'key_2': 'value_2_old'}