You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:09:34 UTC
[airflow] branch v2-5-test updated: Fix "airflow tasks render" cli command for mapped task instances (#28698)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-5-test by this push:
new b36f24753f Fix "airflow tasks render" cli command for mapped task instances (#28698)
b36f24753f is described below
commit b36f24753fc3584648d505a7d13fbc6e6958c1c3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jan 4 21:43:20 2023 +0100
Fix "airflow tasks render" cli command for mapped task instances (#28698)
The fix was to use the 'template_fields' attr directly since both mapped and unmapped
tasks now have that attribute.
I also had to use ti.task instead of the task from dag.get_task due to this error:
`AttributeError: 'DecoratedMappedOperator' object has no attribute 'templates_dict'` and
I wonder if this is a bug
(cherry picked from commit 1da17be37627385fed7fc06584d72e0abda6a1b5)
---
airflow/cli/commands/task_command.py | 9 ++---
tests/cli/commands/test_task_command.py | 62 +++++++++++++++++++++++++++++++++
2 files changed, 67 insertions(+), 4 deletions(-)
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 93e7c81146..2f37579c35 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -591,21 +591,22 @@ def task_test(args, dag=None):
@cli_utils.action_cli(check_db=False)
@suppress_logs_and_warning
-def task_render(args):
+def task_render(args, dag=None):
"""Renders and displays templated fields for a given task."""
- dag = get_dag(args.subdir, args.dag_id)
+ if not dag:
+ dag = get_dag(args.subdir, args.dag_id)
task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(
task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="memory"
)
ti.render_templates()
- for attr in task.__class__.template_fields:
+ for attr in task.template_fields:
print(
textwrap.dedent(
f""" # ----------------------------------------------------------
# property: {attr}
# ----------------------------------------------------------
- {getattr(task, attr)}
+ {getattr(ti.task, attr)}
"""
)
)
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index a062f31d4f..864ea10408 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -41,6 +41,7 @@ from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagRunNotFound
from airflow.models import DagBag, DagRun, Pool, TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
+from airflow.operators.bash import BashOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
@@ -389,6 +390,67 @@ class TestCliTasks:
assert 'echo "2016-01-01"' in output
assert 'echo "2016-01-08"' in output
+ def test_mapped_task_render(self):
+ """
+ tasks render should render and displays templated fields for a given mapping task
+ """
+ with redirect_stdout(io.StringIO()) as stdout:
+ task_command.task_render(
+ self.parser.parse_args(
+ [
+ "tasks",
+ "render",
+ "test_mapped_classic",
+ "consumer_literal",
+ "2022-01-01",
+ "--map-index",
+ "0",
+ ]
+ )
+ )
+ # the dag test_mapped_classic has op_args=[[1], [2], [3]], so the first mapping task should have
+ # op_args=[1]
+ output = stdout.getvalue()
+ assert "[1]" in output
+ assert "[2]" not in output
+ assert "[3]" not in output
+ assert "property: op_args" in output
+
+ def test_mapped_task_render_with_template(self, dag_maker):
+ """
+ tasks render should render and displays templated fields for a given mapping task
+ """
+ with dag_maker() as dag:
+ templated_command = """
+ {% for i in range(5) %}
+ echo "{{ ds }}"
+ echo "{{ macros.ds_add(ds, 7)}}"
+ {% endfor %}
+ """
+ commands = [templated_command, "echo 1"]
+
+ BashOperator.partial(task_id="some_command").expand(bash_command=commands)
+
+ with redirect_stdout(io.StringIO()) as stdout:
+ task_command.task_render(
+ self.parser.parse_args(
+ [
+ "tasks",
+ "render",
+ "test_dag",
+ "some_command",
+ "2022-01-01",
+ "--map-index",
+ "0",
+ ]
+ ),
+ dag=dag,
+ )
+
+ output = stdout.getvalue()
+ assert 'echo "2022-01-01"' in output
+ assert 'echo "2022-01-08"' in output
+
def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
"""
tasks run should return an AirflowException when invalid pickle_id is passed