You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/06 07:16:43 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #26165: Api endpoint update ti

uranusjr commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r963339263


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +543,87 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_mapped_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task_id = data['task_id']
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if ti is None:
+        error_message = f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti.set_state(data["new_state"], session=session)
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION) -> APIResponse:

Review Comment:
   It’s probably better to just name this `patch_task_instance` and allow setting more fields on the task instance. (We can implement only `state` in this PR and implement other fields later.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org