You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/09/19 20:38:14 UTC
[airflow] branch main updated: template rendering issue fix (#26390)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4bf0cb9872 template rendering issue fix (#26390)
4bf0cb9872 is described below
commit 4bf0cb98724a2cf04aab6359881a87aeb9cec0ce
Author: Bowrna <ma...@gmail.com>
AuthorDate: Tue Sep 20 02:07:56 2022 +0530
template rendering issue fix (#26390)
---
airflow/decorators/python.py | 4 +--
airflow/example_dags/example_python_operator.py | 11 ++++++-
airflow/example_dags/sql/sample.sql | 24 +++++++++++++++
docs/apache-airflow/howto/operator/python.rst | 7 +++++
.../endpoints/test_task_instance_endpoint.py | 36 ++++++++++++++--------
tests/serialization/test_dag_serialization.py | 8 ++---
6 files changed, 70 insertions(+), 20 deletions(-)
diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py
index 0568e4809e..8800e7bf9a 100644
--- a/airflow/decorators/python.py
+++ b/airflow/decorators/python.py
@@ -35,8 +35,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
"""
- template_fields: Sequence[str] = ('op_args', 'op_kwargs')
- template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
+ template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')
+ template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
# since we won't mutate the arguments, we should just do the shallow copy
# there are some cases we can't deepcopy the objects (e.g protobuf).
diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py
index d8284b19ac..3be89bc91c 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -47,6 +47,7 @@ with DAG(
catchup=False,
tags=['example'],
) as dag:
+
# [START howto_operator_python]
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
@@ -58,6 +59,14 @@ with DAG(
run_this = print_context()
# [END howto_operator_python]
+ # [START howto_operator_python_render_sql]
+ @task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
+ def log_sql(**kwargs):
+ logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
+
+ log_the_sql = log_sql()
+ # [END howto_operator_python_render_sql]
+
# [START howto_operator_python_kwargs]
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
@@ -69,7 +78,7 @@ with DAG(
sleeping_task = my_sleeping_function(random_base=float(i) / 10)
- run_this >> sleeping_task
+ run_this >> log_the_sql >> sleeping_task
# [END howto_operator_python_kwargs]
if not shutil.which("virtualenv"):
diff --git a/airflow/example_dags/sql/sample.sql b/airflow/example_dags/sql/sample.sql
new file mode 100644
index 0000000000..23af6ab4b9
--- /dev/null
+++ b/airflow/example_dags/sql/sample.sql
@@ -0,0 +1,24 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+
+CREATE TABLE Orders (
+ order_id INT PRIMARY KEY,
+ name TEXT,
+ description TEXT
+)
diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst
index 7128a2a5e0..8ba2ac64eb 100644
--- a/docs/apache-airflow/howto/operator/python.rst
+++ b/docs/apache-airflow/howto/operator/python.rst
@@ -55,6 +55,13 @@ argument.
The ``templates_dict`` argument is templated, so each value in the dictionary
is evaluated as a :ref:`Jinja template <concepts:jinja-templating>`.
+.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_python_render_sql]
+ :end-before: [END howto_operator_python_render_sql]
+
+
.. _howto/operator:PythonVirtualenvOperator:
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 44f21aa95a..092e282c55 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -204,7 +204,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
- "priority_weight": 8,
+ "priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
@@ -258,7 +258,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
- "priority_weight": 8,
+ "priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
@@ -302,7 +302,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
- "priority_weight": 8,
+ "priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
@@ -349,7 +349,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
- "priority_weight": 8,
+ "priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": {
@@ -367,7 +367,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"try_number": 0,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
- "rendered_fields": {'op_args': [], 'op_kwargs': {}},
+ "rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
"trigger": None,
"triggerer_job": None,
}
@@ -411,7 +411,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
- "priority_weight": 8,
+ "priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
'sla_miss': None,
@@ -421,7 +421,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
"try_number": 0,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
- "rendered_fields": {'op_args': [], 'op_kwargs': {}},
+ "rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
"trigger": None,
"triggerer_job": None,
}
@@ -832,8 +832,8 @@ class TestGetTaskInstancesBatch(TestTaskInstanceEndpoint):
(
"with dag filter",
{"dag_ids": ["example_python_operator", "example_skip_dag"]},
- 16,
- 16,
+ 17,
+ 17,
),
],
)
@@ -1154,6 +1154,10 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=4),
"state": State.RUNNING,
},
+ {
+ "execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=5),
+ "state": State.RUNNING,
+ },
]
self.create_task_instances(
@@ -1182,30 +1186,36 @@ class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_1',
'execution_date': '2020-01-02T00:00:00+00:00',
- 'task_id': 'sleep_for_0',
+ 'task_id': 'log_sql_query',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_2',
'execution_date': '2020-01-03T00:00:00+00:00',
- 'task_id': 'sleep_for_1',
+ 'task_id': 'sleep_for_0',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_3',
'execution_date': '2020-01-04T00:00:00+00:00',
- 'task_id': 'sleep_for_2',
+ 'task_id': 'sleep_for_1',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_4',
'execution_date': '2020-01-05T00:00:00+00:00',
+ 'task_id': 'sleep_for_2',
+ },
+ {
+ 'dag_id': 'example_python_operator',
+ 'dag_run_id': 'TEST_DAG_RUN_ID_5',
+ 'execution_date': '2020-01-06T00:00:00+00:00',
'task_id': 'sleep_for_3',
},
]
for task_instance in expected_response:
assert task_instance in response.json["task_instances"]
- assert 5 == len(response.json["task_instances"])
+ assert 6 == len(response.json["task_instances"])
assert 0 == failed_dag_runs, 0
def test_should_raises_401_unauthenticated(self):
diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py
index aa409183e8..44a218290e 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2151,8 +2151,8 @@ def test_taskflow_expand_serde():
'ui_fgcolor': '#000',
'task_id': 'x',
'template_ext': [],
- 'template_fields': ['op_args', 'op_kwargs'],
- 'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
+ 'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
+ 'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
"_disallow_kwargs_override": False,
'_expand_input_attr': 'op_kwargs_expand_input',
}
@@ -2234,8 +2234,8 @@ def test_taskflow_expand_kwargs_serde(strict):
'ui_fgcolor': '#000',
'task_id': 'x',
'template_ext': [],
- 'template_fields': ['op_args', 'op_kwargs'],
- 'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
+ 'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
+ 'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
"_disallow_kwargs_override": strict,
'_expand_input_attr': 'op_kwargs_expand_input',
}