You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/01/12 00:09:34 UTC

[airflow] branch v2-5-test updated: Fix "airflow tasks render" cli command for mapped task instances (#28698)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-5-test by this push:
     new b36f24753f Fix "airflow tasks render" cli command for mapped task instances (#28698)
b36f24753f is described below

commit b36f24753fc3584648d505a7d13fbc6e6958c1c3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Wed Jan 4 21:43:20 2023 +0100

    Fix "airflow tasks render" cli command for mapped task instances (#28698)
    
    The fix was to use the 'template_fields' attr directly since both mapped and unmapped
    tasks now have that attribute.
    I also had to use ti.task instead of the task from dag.get_task due to this error:
    `AttributeError: 'DecoratedMappedOperator' object has no attribute 'templates_dict'` and
    I wonder if this is a bug
    
    (cherry picked from commit 1da17be37627385fed7fc06584d72e0abda6a1b5)
---
 airflow/cli/commands/task_command.py    |  9 ++---
 tests/cli/commands/test_task_command.py | 62 +++++++++++++++++++++++++++++++++
 2 files changed, 67 insertions(+), 4 deletions(-)

diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 93e7c81146..2f37579c35 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -591,21 +591,22 @@ def task_test(args, dag=None):
 
 @cli_utils.action_cli(check_db=False)
 @suppress_logs_and_warning
-def task_render(args):
+def task_render(args, dag=None):
     """Renders and displays templated fields for a given task."""
-    dag = get_dag(args.subdir, args.dag_id)
+    if not dag:
+        dag = get_dag(args.subdir, args.dag_id)
     task = dag.get_task(task_id=args.task_id)
     ti, _ = _get_ti(
         task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="memory"
     )
     ti.render_templates()
-    for attr in task.__class__.template_fields:
+    for attr in task.template_fields:
         print(
             textwrap.dedent(
                 f"""        # ----------------------------------------------------------
         # property: {attr}
         # ----------------------------------------------------------
-        {getattr(task, attr)}
+        {getattr(ti.task, attr)}
         """
             )
         )
diff --git a/tests/cli/commands/test_task_command.py b/tests/cli/commands/test_task_command.py
index a062f31d4f..864ea10408 100644
--- a/tests/cli/commands/test_task_command.py
+++ b/tests/cli/commands/test_task_command.py
@@ -41,6 +41,7 @@ from airflow.configuration import conf
 from airflow.exceptions import AirflowException, DagRunNotFound
 from airflow.models import DagBag, DagRun, Pool, TaskInstance
 from airflow.models.serialized_dag import SerializedDagModel
+from airflow.operators.bash import BashOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State
@@ -389,6 +390,67 @@ class TestCliTasks:
         assert 'echo "2016-01-01"' in output
         assert 'echo "2016-01-08"' in output
 
+    def test_mapped_task_render(self):
+        """
+        tasks render should render and displays templated fields for a given mapping task
+        """
+        with redirect_stdout(io.StringIO()) as stdout:
+            task_command.task_render(
+                self.parser.parse_args(
+                    [
+                        "tasks",
+                        "render",
+                        "test_mapped_classic",
+                        "consumer_literal",
+                        "2022-01-01",
+                        "--map-index",
+                        "0",
+                    ]
+                )
+            )
+        # the dag test_mapped_classic has op_args=[[1], [2], [3]], so the first mapping task should have
+        # op_args=[1]
+        output = stdout.getvalue()
+        assert "[1]" in output
+        assert "[2]" not in output
+        assert "[3]" not in output
+        assert "property: op_args" in output
+
+    def test_mapped_task_render_with_template(self, dag_maker):
+        """
+        tasks render should render and displays templated fields for a given mapping task
+        """
+        with dag_maker() as dag:
+            templated_command = """
+            {% for i in range(5) %}
+                echo "{{ ds }}"
+                echo "{{ macros.ds_add(ds, 7)}}"
+            {% endfor %}
+            """
+            commands = [templated_command, "echo 1"]
+
+            BashOperator.partial(task_id="some_command").expand(bash_command=commands)
+
+        with redirect_stdout(io.StringIO()) as stdout:
+            task_command.task_render(
+                self.parser.parse_args(
+                    [
+                        "tasks",
+                        "render",
+                        "test_dag",
+                        "some_command",
+                        "2022-01-01",
+                        "--map-index",
+                        "0",
+                    ]
+                ),
+                dag=dag,
+            )
+
+        output = stdout.getvalue()
+        assert 'echo "2022-01-01"' in output
+        assert 'echo "2022-01-08"' in output
+
     def test_cli_run_when_pickle_and_dag_cli_method_selected(self):
         """
         tasks run should return an AirflowException when invalid pickle_id is passed