You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/19 20:38:14 UTC

[airflow] branch main updated: template rendering issue fix (#26390)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4bf0cb9872 template rendering issue fix (#26390)
4bf0cb9872 is described below

commit 4bf0cb98724a2cf04aab6359881a87aeb9cec0ce
Author: Bowrna <ma...@gmail.com>
AuthorDate: Tue Sep 20 02:07:56 2022 +0530

    template rendering issue fix (#26390)
---
 airflow/decorators/python.py                       |  4 +--
 airflow/example_dags/example_python_operator.py    | 11 ++++++-
 airflow/example_dags/sql/sample.sql                | 24 +++++++++++++++
 docs/apache-airflow/howto/operator/python.rst      |  7 +++++
 .../endpoints/test_task_instance_endpoint.py       | 36 ++++++++++++++--------
 tests/serialization/test_dag_serialization.py      |  8 ++---
 6 files changed, 70 insertions(+), 20 deletions(-)

diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
index 0568e4809e..8800e7bf9a 100644
--- a/airflow/decorators/python.py
+++ b/airflow/decorators/python.py
@@ -35,8 +35,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
         multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
     """
 
-    template_fields: Sequence[str] = ('op_args', 'op_kwargs')
-    template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+    template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')
+    template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
 
     # since we won't mutate the arguments, we should just do the shallow copy
     # there are some cases we can't deepcopy the objects (e.g protobuf).
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index d8284b19ac..3be89bc91c 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -47,6 +47,7 @@ with DAG(
     catchup=False,
     tags=['example'],
 ) as dag:
+
     # [START howto_operator_python]
     @task(task_id="print_the_context")
     def print_context(ds=None, **kwargs):
@@ -58,6 +59,14 @@ with DAG(
     run_this = print_context()
     # [END howto_operator_python]
 
+    # [START howto_operator_python_render_sql]
+    @task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
+    def log_sql(**kwargs):
+        logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
+
+    log_the_sql = log_sql()
+    # [END howto_operator_python_render_sql]
+
     # [START howto_operator_python_kwargs]
     # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
     for i in range(5):
@@ -69,7 +78,7 @@ with DAG(
 
         sleeping_task = my_sleeping_function(random_base=float(i) / 10)
 
-        run_this >> sleeping_task
+        run_this >> log_the_sql >> sleeping_task
     # [END howto_operator_python_kwargs]
 
     if not shutil.which("virtualenv"):
diff --git a/airflow/example_dags/sql/sample.sql b/airflow/example_dags/sql/sample.sql
new file mode 100644
index 0000000000..23af6ab4b9
--- /dev/null
+++ b/airflow/example_dags/sql/sample.sql
@@ -0,0 +1,24 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+
+CREATE TABLE Orders (
+    order_id INT PRIMARY KEY,
+    name TEXT,
+    description TEXT
+)
diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst
index 7128a2a5e0..8ba2ac64eb 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -55,6 +55,13 @@ argument.
 The ``templates_dict`` argument is templated, so each value in the dictionary
 is evaluated as a :ref:`Jinja template <concepts:jinja-templating>`.
 
+.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_python_render_sql]
+    :end-before: [END howto_operator_python_render_sql]
+
+
 
 
 .. _howto/operator:PythonVirtualenvOperator:
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 44f21aa95a..092e282c55 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -204,7 +204,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "pid": 100,
             "pool": "default_pool",
             "pool_slots": 1,
-            "priority_weight": 8,
+            "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
             "sla_miss": None,
@@ -258,7 +258,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "pid": 100,
             "pool": "default_pool",
             "pool_slots": 1,
-            "priority_weight": 8,
+            "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
             "sla_miss": None,
@@ -302,7 +302,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "pid": 100,
             "pool": "default_pool",
             "pool_slots": 1,
-            "priority_weight": 8,
+            "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
             "sla_miss": None,
@@ -349,7 +349,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "pid": 100,
             "pool": "default_pool",
             "pool_slots": 1,
-            "priority_weight": 8,
+            "priority_weight": 9,
             "queue": "default_queue",
             "queued_when": None,
             "sla_miss": {
@@ -367,7 +367,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "try_number": 0,
             "unixname": getuser(),
             "dag_run_id": "TEST_DAG_RUN_ID",
-            "rendered_fields": {'op_args': [], 'op_kwargs': {}},
+            "rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
             "trigger": None,
             "triggerer_job": None,
         }
@@ -411,7 +411,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
                 "pid": 100,
                 "pool": "default_pool",
                 "pool_slots": 1,
-                "priority_weight": 8,
+                "priority_weight": 9,
                 "queue": "default_queue",
                 "queued_when": None,
                 'sla_miss': None,
@@ -421,7 +421,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
                 "try_number": 0,
                 "unixname": getuser(),
                 "dag_run_id": "TEST_DAG_RUN_ID",
-                "rendered_fields": {'op_args': [], 'op_kwargs': {}},
+                "rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
                 "trigger": None,
                 "triggerer_job": None,
             }
@@ -832,8 +832,8 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
             (
                 "with dag filter",
                 {"dag_ids": ["example_python_operator", "example_skip_dag"]},
-                16,
-                16,
+                17,
+                17,
             ),
         ],
     )
@@ -1154,6 +1154,10 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
                 "execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=4),
                 "state": State.RUNNING,
             },
+            {
+                "execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=5),
+                "state": State.RUNNING,
+            },
         ]
 
         self.create_task_instances(
@@ -1182,30 +1186,36 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
                 'dag_id': 'example_python_operator',
                 'dag_run_id': 'TEST_DAG_RUN_ID_1',
                 'execution_date': '2020-01-02T00:00:00+00:00',
-                'task_id': 'sleep_for_0',
+                'task_id': 'log_sql_query',
             },
             {
                 'dag_id': 'example_python_operator',
                 'dag_run_id': 'TEST_DAG_RUN_ID_2',
                 'execution_date': '2020-01-03T00:00:00+00:00',
-                'task_id': 'sleep_for_1',
+                'task_id': 'sleep_for_0',
             },
             {
                 'dag_id': 'example_python_operator',
                 'dag_run_id': 'TEST_DAG_RUN_ID_3',
                 'execution_date': '2020-01-04T00:00:00+00:00',
-                'task_id': 'sleep_for_2',
+                'task_id': 'sleep_for_1',
             },
             {
                 'dag_id': 'example_python_operator',
                 'dag_run_id': 'TEST_DAG_RUN_ID_4',
                 'execution_date': '2020-01-05T00:00:00+00:00',
+                'task_id': 'sleep_for_2',
+            },
+            {
+                'dag_id': 'example_python_operator',
+                'dag_run_id': 'TEST_DAG_RUN_ID_5',
+                'execution_date': '2020-01-06T00:00:00+00:00',
                 'task_id': 'sleep_for_3',
             },
         ]
         for task_instance in expected_response:
             assert task_instance in response.json["task_instances"]
-        assert 5 == len(response.json["task_instances"])
+        assert 6 == len(response.json["task_instances"])
         assert 0 == failed_dag_runs, 0
 
     def test_should_raises_401_unauthenticated(self):
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index aa409183e8..44a218290e 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2151,8 +2151,8 @@ def test_taskflow_expand_serde():
         'ui_fgcolor': '#000',
         'task_id': 'x',
         'template_ext': [],
-        'template_fields': ['op_args', 'op_kwargs'],
-        'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
+        'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
+        'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
         "_disallow_kwargs_override": False,
         '_expand_input_attr': 'op_kwargs_expand_input',
     }
@@ -2234,8 +2234,8 @@ def test_taskflow_expand_kwargs_serde(strict):
         'ui_fgcolor': '#000',
         'task_id': 'x',
         'template_ext': [],
-        'template_fields': ['op_args', 'op_kwargs'],
-        'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
+        'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
+        'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
         "_disallow_kwargs_override": strict,
         '_expand_input_attr': 'op_kwargs_expand_input',
     }