You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/28 12:55:00 UTC

[airflow] branch main updated: Add is_mapped field to Task response. (#23319)

This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f3d80c2a0d Add is_mapped field to Task response. (#23319)
f3d80c2a0d is described below

commit f3d80c2a0dce93b908d7c9de30c9cba673eb20d5
Author: Karthikeyan Singaravelan <ti...@gmail.com>
AuthorDate: Thu Apr 28 18:24:48 2022 +0530

    Add is_mapped field to Task response. (#23319)
    
    * Add is_mapped field to Task response.
    
    * Add is_mapped to schema file and add test for GetTasks.
---
 airflow/api_connexion/openapi/v1.yaml              |   3 +
 airflow/api_connexion/schemas/task_schema.py       |   1 +
 .../api_connexion/endpoints/test_task_endpoint.py  | 114 ++++++++++++++++++++-
 tests/api_connexion/schemas/test_task_schema.py    |   2 +
 4 files changed, 119 insertions(+), 1 deletion(-)

diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index a0c3150be1..d19c11aeba 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3013,6 +3013,9 @@ components:
         depends_on_past:
           type: boolean
           readOnly: true
+        is_mapped:
+          type: boolean
+          readOnly: true
         wait_for_downstream:
           type: boolean
           readOnly: true
diff --git a/airflow/api_connexion/schemas/task_schema.py b/airflow/api_connexion/schemas/task_schema.py
index 600d28ac0d..aa9a470308 100644
--- a/airflow/api_connexion/schemas/task_schema.py
+++ b/airflow/api_connexion/schemas/task_schema.py
@@ -58,6 +58,7 @@ class TaskSchema(Schema):
     sub_dag = fields.Nested(DAGSchema, dump_only=True)
     downstream_task_ids = fields.List(fields.String(), dump_only=True)
     params = fields.Method('get_params', dump_only=True)
+    is_mapped = fields.Boolean(dump_only=True)
 
     def _get_class_reference(self, obj):
         result = ClassReferenceSchema().dump(obj)
diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py b/tests/api_connexion/endpoints/test_task_endpoint.py
index d61af25d52..9748305d8c 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -52,8 +52,11 @@ def configured_app(minimal_app_for_api):
 
 class TestTaskEndpoint:
     dag_id = "test_dag"
+    mapped_dag_id = "test_mapped_task"
     task_id = "op1"
     task_id2 = 'op2'
+    task_id3 = "op3"
+    mapped_task_id = "mapped_task"
     task1_start_date = datetime(2020, 6, 15)
     task2_start_date = datetime(2020, 6, 16)
 
@@ -63,9 +66,13 @@ class TestTaskEndpoint:
             task1 = EmptyOperator(task_id=self.task_id, params={'foo': 'bar'})
             task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date)
 
+        with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag:
+            task3 = EmptyOperator(task_id=self.task_id3)  # noqa
+            mapped_task = EmptyOperator.partial(task_id=self.mapped_task_id).expand()  # noqa
+
         task1 >> task2
         dag_bag = DagBag(os.devnull, include_examples=False)
-        dag_bag.dags = {dag.dag_id: dag}
+        dag_bag.dags = {dag.dag_id: dag, mapped_dag.dag_id: mapped_dag}
         configured_app.dag_bag = dag_bag  # type:ignore
 
     @staticmethod
@@ -120,6 +127,7 @@ class TestGetTask(TestTaskEndpoint):
             "ui_fgcolor": "#000",
             "wait_for_downstream": False,
             "weight_rule": "downstream",
+            "is_mapped": False,
         }
         response = self.client.get(
             f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={'REMOTE_USER': "test"}
@@ -127,6 +135,40 @@ class TestGetTask(TestTaskEndpoint):
         assert response.status_code == 200
         assert response.json == expected
 
+    def test_mapped_task(self):
+        expected = {
+            "class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"},
+            "depends_on_past": False,
+            "downstream_task_ids": [],
+            "end_date": None,
+            "execution_timeout": None,
+            "extra_links": [],
+            "is_mapped": True,
+            "owner": "airflow",
+            "params": {},
+            "pool": "default_pool",
+            "pool_slots": 1.0,
+            "priority_weight": 1.0,
+            "queue": "default",
+            "retries": 0.0,
+            "retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
+            "retry_exponential_backoff": False,
+            "start_date": "2020-06-15T00:00:00+00:00",
+            "task_id": "mapped_task",
+            "template_fields": [],
+            "trigger_rule": "all_success",
+            "ui_color": "#e8f7e4",
+            "ui_fgcolor": "#000",
+            "wait_for_downstream": False,
+            "weight_rule": "downstream",
+        }
+        response = self.client.get(
+            f"/api/v1/dags/{self.mapped_dag_id}/tasks/{self.mapped_task_id}",
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+        assert response.status_code == 200
+        assert response.json == expected
+
     def test_should_respond_200_serialized(self):
 
         # Get the dag out of the dagbag before we patch it to an empty one
@@ -170,6 +212,7 @@ class TestGetTask(TestTaskEndpoint):
             "ui_fgcolor": "#000",
             "wait_for_downstream": False,
             "weight_rule": "downstream",
+            "is_mapped": False,
         }
         response = self.client.get(
             f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={'REMOTE_USER': "test"}
@@ -235,6 +278,7 @@ class TestGetTasks(TestTaskEndpoint):
                     "ui_fgcolor": "#000",
                     "wait_for_downstream": False,
                     "weight_rule": "downstream",
+                    "is_mapped": False,
                 },
                 {
                     "class_ref": {
@@ -263,6 +307,7 @@ class TestGetTasks(TestTaskEndpoint):
                     "ui_fgcolor": "#000",
                     "wait_for_downstream": False,
                     "weight_rule": "downstream",
+                    "is_mapped": False,
                 },
             ],
             "total_entries": 2,
@@ -273,6 +318,73 @@ class TestGetTasks(TestTaskEndpoint):
         assert response.status_code == 200
         assert response.json == expected
 
+    def test_get_tasks_mapped(self):
+        expected = {
+            "tasks": [
+                {
+                    "class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"},
+                    "depends_on_past": False,
+                    "downstream_task_ids": [],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "is_mapped": True,
+                    "owner": "airflow",
+                    "params": {},
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
+                    "retry_exponential_backoff": False,
+                    "start_date": "2020-06-15T00:00:00+00:00",
+                    "task_id": "mapped_task",
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                },
+                {
+                    "class_ref": {
+                        "class_name": "EmptyOperator",
+                        "module_path": "airflow.operators.empty",
+                    },
+                    "depends_on_past": False,
+                    "downstream_task_ids": [],
+                    "end_date": None,
+                    "execution_timeout": None,
+                    "extra_links": [],
+                    "owner": "airflow",
+                    "params": {},
+                    "pool": "default_pool",
+                    "pool_slots": 1.0,
+                    "priority_weight": 1.0,
+                    "queue": "default",
+                    "retries": 0.0,
+                    "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
+                    "retry_exponential_backoff": False,
+                    "start_date": "2020-06-15T00:00:00+00:00",
+                    "task_id": self.task_id3,
+                    "template_fields": [],
+                    "trigger_rule": "all_success",
+                    "ui_color": "#e8f7e4",
+                    "ui_fgcolor": "#000",
+                    "wait_for_downstream": False,
+                    "weight_rule": "downstream",
+                    "is_mapped": False,
+                },
+            ],
+            "total_entries": 2,
+        }
+        response = self.client.get(
+            f"/api/v1/dags/{self.mapped_dag_id}/tasks", environ_overrides={'REMOTE_USER': "test"}
+        )
+        assert response.status_code == 200
+        assert response.json == expected
+
     def test_should_respond_200_ascending_order_by_start_date(self):
         response = self.client.get(
             f"/api/v1/dags/{self.dag_id}/tasks?order_by=start_date",
diff --git a/tests/api_connexion/schemas/test_task_schema.py b/tests/api_connexion/schemas/test_task_schema.py
index 6724ec7d0b..07f4b592da 100644
--- a/tests/api_connexion/schemas/test_task_schema.py
+++ b/tests/api_connexion/schemas/test_task_schema.py
@@ -56,6 +56,7 @@ class TestTaskSchema:
             "ui_fgcolor": "#000",
             "wait_for_downstream": False,
             "weight_rule": "downstream",
+            "is_mapped": False,
         }
         assert expected == result
 
@@ -101,6 +102,7 @@ class TestTaskCollectionSchema:
                     "ui_fgcolor": "#000",
                     "wait_for_downstream": False,
                     "weight_rule": "downstream",
+                    "is_mapped": False,
                 }
             ],
             "total_entries": 1,