You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bb...@apache.org on 2022/04/28 12:55:00 UTC
[airflow] branch main updated: Add is_mapped field to Task response. (#23319)
This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f3d80c2a0d Add is_mapped field to Task response. (#23319)
f3d80c2a0d is described below
commit f3d80c2a0dce93b908d7c9de30c9cba673eb20d5
Author: Karthikeyan Singaravelan <ti...@gmail.com>
AuthorDate: Thu Apr 28 18:24:48 2022 +0530
Add is_mapped field to Task response. (#23319)
* Add is_mapped field to Task response.
* Add is_mapped to schema file and add test for GetTasks.
---
airflow/api_connexion/openapi/v1.yaml | 3 +
airflow/api_connexion/schemas/task_schema.py | 1 +
.../api_connexion/endpoints/test_task_endpoint.py | 114 ++++++++++++++++++++-
tests/api_connexion/schemas/test_task_schema.py | 2 +
4 files changed, 119 insertions(+), 1 deletion(-)
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index a0c3150be1..d19c11aeba 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3013,6 +3013,9 @@ components:
depends_on_past:
type: boolean
readOnly: true
+ is_mapped:
+ type: boolean
+ readOnly: true
wait_for_downstream:
type: boolean
readOnly: true
diff --git a/airflow/api_connexion/schemas/task_schema.py b/airflow/api_connexion/schemas/task_schema.py
index 600d28ac0d..aa9a470308 100644
--- a/airflow/api_connexion/schemas/task_schema.py
+++ b/airflow/api_connexion/schemas/task_schema.py
@@ -58,6 +58,7 @@ class TaskSchema(Schema):
sub_dag = fields.Nested(DAGSchema, dump_only=True)
downstream_task_ids = fields.List(fields.String(), dump_only=True)
params = fields.Method('get_params', dump_only=True)
+ is_mapped = fields.Boolean(dump_only=True)
def _get_class_reference(self, obj):
result = ClassReferenceSchema().dump(obj)
diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py b/tests/api_connexion/endpoints/test_task_endpoint.py
index d61af25d52..9748305d8c 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -52,8 +52,11 @@ def configured_app(minimal_app_for_api):
class TestTaskEndpoint:
dag_id = "test_dag"
+ mapped_dag_id = "test_mapped_task"
task_id = "op1"
task_id2 = 'op2'
+ task_id3 = "op3"
+ mapped_task_id = "mapped_task"
task1_start_date = datetime(2020, 6, 15)
task2_start_date = datetime(2020, 6, 16)
@@ -63,9 +66,13 @@ class TestTaskEndpoint:
task1 = EmptyOperator(task_id=self.task_id, params={'foo': 'bar'})
task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date)
+ with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag:
+ task3 = EmptyOperator(task_id=self.task_id3) # noqa
+ mapped_task = EmptyOperator.partial(task_id=self.mapped_task_id).expand() # noqa
+
task1 >> task2
dag_bag = DagBag(os.devnull, include_examples=False)
- dag_bag.dags = {dag.dag_id: dag}
+ dag_bag.dags = {dag.dag_id: dag, mapped_dag.dag_id: mapped_dag}
configured_app.dag_bag = dag_bag # type:ignore
@staticmethod
@@ -120,6 +127,7 @@ class TestGetTask(TestTaskEndpoint):
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
+ "is_mapped": False,
}
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={'REMOTE_USER': "test"}
@@ -127,6 +135,40 @@ class TestGetTask(TestTaskEndpoint):
assert response.status_code == 200
assert response.json == expected
+ def test_mapped_task(self):
+ expected = {
+ "class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"},
+ "depends_on_past": False,
+ "downstream_task_ids": [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "is_mapped": True,
+ "owner": "airflow",
+ "params": {},
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
+ "retry_exponential_backoff": False,
+ "start_date": "2020-06-15T00:00:00+00:00",
+ "task_id": "mapped_task",
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ }
+ response = self.client.get(
+ f"/api/v1/dags/{self.mapped_dag_id}/tasks/{self.mapped_task_id}",
+ environ_overrides={'REMOTE_USER': "test"},
+ )
+ assert response.status_code == 200
+ assert response.json == expected
+
def test_should_respond_200_serialized(self):
# Get the dag out of the dagbag before we patch it to an empty one
@@ -170,6 +212,7 @@ class TestGetTask(TestTaskEndpoint):
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
+ "is_mapped": False,
}
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks/{self.task_id}", environ_overrides={'REMOTE_USER': "test"}
@@ -235,6 +278,7 @@ class TestGetTasks(TestTaskEndpoint):
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
+ "is_mapped": False,
},
{
"class_ref": {
@@ -263,6 +307,7 @@ class TestGetTasks(TestTaskEndpoint):
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
+ "is_mapped": False,
},
],
"total_entries": 2,
@@ -273,6 +318,73 @@ class TestGetTasks(TestTaskEndpoint):
assert response.status_code == 200
assert response.json == expected
+ def test_get_tasks_mapped(self):
+ expected = {
+ "tasks": [
+ {
+ "class_ref": {"class_name": "EmptyOperator", "module_path": "airflow.operators.empty"},
+ "depends_on_past": False,
+ "downstream_task_ids": [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "is_mapped": True,
+ "owner": "airflow",
+ "params": {},
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0, "microseconds": 0, "seconds": 300},
+ "retry_exponential_backoff": False,
+ "start_date": "2020-06-15T00:00:00+00:00",
+ "task_id": "mapped_task",
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ },
+ {
+ "class_ref": {
+ "class_name": "EmptyOperator",
+ "module_path": "airflow.operators.empty",
+ },
+ "depends_on_past": False,
+ "downstream_task_ids": [],
+ "end_date": None,
+ "execution_timeout": None,
+ "extra_links": [],
+ "owner": "airflow",
+ "params": {},
+ "pool": "default_pool",
+ "pool_slots": 1.0,
+ "priority_weight": 1.0,
+ "queue": "default",
+ "retries": 0.0,
+ "retry_delay": {"__type": "TimeDelta", "days": 0, "seconds": 300, "microseconds": 0},
+ "retry_exponential_backoff": False,
+ "start_date": "2020-06-15T00:00:00+00:00",
+ "task_id": self.task_id3,
+ "template_fields": [],
+ "trigger_rule": "all_success",
+ "ui_color": "#e8f7e4",
+ "ui_fgcolor": "#000",
+ "wait_for_downstream": False,
+ "weight_rule": "downstream",
+ "is_mapped": False,
+ },
+ ],
+ "total_entries": 2,
+ }
+ response = self.client.get(
+ f"/api/v1/dags/{self.mapped_dag_id}/tasks", environ_overrides={'REMOTE_USER': "test"}
+ )
+ assert response.status_code == 200
+ assert response.json == expected
+
def test_should_respond_200_ascending_order_by_start_date(self):
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/tasks?order_by=start_date",
diff --git a/tests/api_connexion/schemas/test_task_schema.py b/tests/api_connexion/schemas/test_task_schema.py
index 6724ec7d0b..07f4b592da 100644
--- a/tests/api_connexion/schemas/test_task_schema.py
+++ b/tests/api_connexion/schemas/test_task_schema.py
@@ -56,6 +56,7 @@ class TestTaskSchema:
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
+ "is_mapped": False,
}
assert expected == result
@@ -101,6 +102,7 @@ class TestTaskCollectionSchema:
"ui_fgcolor": "#000",
"wait_for_downstream": False,
"weight_rule": "downstream",
+ "is_mapped": False,
}
],
"total_entries": 1,