You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/07 17:47:37 UTC

[GitHub] [airflow] cedkoffeto commented on a diff in pull request #26165: Api endpoint update ti

cedkoffeto commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r965122705


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +543,87 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_mapped_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task_id = data['task_id']
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if ti is None:
+        error_message = f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti.set_state(data["new_state"], session=session)
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION) -> APIResponse:

Review Comment:
   yes good point I'll change it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org