You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by xd...@apache.org on 2021/03/30 07:36:55 UTC

[airflow] branch master updated: Add test to guard how DAG/Operator params work together (#15075)

This is an automated email from the ASF dual-hosted git repository.

xddeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new daa3f3c  Add test to guard how DAG/Operator params work together (#15075)
daa3f3c is described below

commit daa3f3cb59f8f7d19184e474c26bd21611806c63
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Tue Mar 30 09:36:21 2021 +0200

    Add test to guard how DAG/Operator params work together (#15075)
---
 tests/core/test_core.py | 37 +++++++++++++++++++++++++++++++++++++
 1 file changed, 37 insertions(+)

diff --git a/tests/core/test_core.py b/tests/core/test_core.py
index f6dd6ee..061a1a0 100644
--- a/tests/core/test_core.py
+++ b/tests/core/test_core.py
@@ -421,3 +421,40 @@ class TestCore(unittest.TestCase):
 
         assert context['prev_ds'] == execution_ds
         assert context['prev_ds_nodash'] == execution_ds_nodash
+
+    def test_dag_params_and_task_params(self):
+        # This test case guards how params of DAG and Operator work together.
+        # - If any key exists in either DAG's or Operator's params,
+        #   it is guaranteed to be available eventually.
+        # - If any key exists in both DAG's params and Operator's params,
+        #   the latter has precedence.
+        TI = TaskInstance
+
+        dag = DAG(
+            TEST_DAG_ID,
+            default_args=self.args,
+            schedule_interval=timedelta(weeks=1),
+            start_date=DEFAULT_DATE,
+            params={'key_1': 'value_1', 'key_2': 'value_2_old'},
+        )
+        task1 = DummyOperator(
+            task_id='task1',
+            dag=dag,
+            params={'key_2': 'value_2_new', 'key_3': 'value_3'},
+        )
+        task2 = DummyOperator(task_id='task2', dag=dag)
+        dag.create_dagrun(
+            run_type=DagRunType.SCHEDULED,
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING,
+            external_trigger=True,
+        )
+        task1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        task2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+        ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+        ti2 = TI(task=task2, execution_date=DEFAULT_DATE)
+        context1 = ti1.get_template_context()
+        context2 = ti2.get_template_context()
+
+        assert context1['params'] == {'key_1': 'value_1', 'key_2': 'value_2_new', 'key_3': 'value_3'}
+        assert context2['params'] == {'key_1': 'value_1', 'key_2': 'value_2_old'}