You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/09/05 17:46:03 UTC

[GitHub] [airflow] cedkoffeto opened a new pull request, #26165: Api endpoint update ti

cedkoffeto opened a new pull request, #26165:
URL: https://github.com/apache/airflow/pull/26165

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   This PR adds a patch method for these two endpoints for updating a single task instance status
   - /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}
   - /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}
   
   closes: #23246
   related: #23227
   
   ---
   
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #26165:
URL: https://github.com/apache/airflow/pull/26165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r963339263


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +543,87 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_mapped_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task_id = data['task_id']
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if ti is None:
+        error_message = f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti.set_state(data["new_state"], session=session)
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION) -> APIResponse:

Review Comment:
   It’s probably better to just name this `patch_task_instance` and allow setting more fields on the task instance. (We can implement only `state` in this PR and implement other fields later.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r978372009


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +545,103 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)

Review Comment:
   ```suggestion
       if not dag.has_task(task_id):
           error_message = f"Task ID {task_id} not found"
           raise NotFound(error_message)
   ```
   
   Similarly we can make this cheaper



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r1003648122


##########
tests/api_connexion/endpoints/test_task_instance_endpoint.py:
##########
@@ -1582,3 +1582,226 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se
         )
         assert response.status_code == 400
         assert response.json['detail'] == expected
+
+
+class TestPatchTaskInstance(TestTaskInstanceEndpoint):
+    ENDPOINT_URL = (
+        "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+    )
+
+    @mock.patch('airflow.models.dag.DAG.set_task_instance_state')
+    def test_should_call_mocked_api(self, mock_set_task_instance_state, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+        mock_set_task_instance_state.return_value = session.query(TaskInstance).get(
+            {
+                "task_id": "print_the_context",
+                "dag_id": "example_python_operator",
+                "run_id": "TEST_DAG_RUN_ID",
+                "map_index": -1,
+            }
+        )
+        response = self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "new_state": NEW_STATE,
+            },
+        )
+        assert response.status_code == 200
+        assert response.json == {
+            'dag_id': 'example_python_operator',
+            'dag_run_id': 'TEST_DAG_RUN_ID',
+            'execution_date': '2020-01-01T00:00:00+00:00',
+            'task_id': 'print_the_context',
+        }
+
+        mock_set_task_instance_state.assert_called_once_with(
+            task_id="print_the_context",
+            run_id="TEST_DAG_RUN_ID",
+            map_indexes=[-1],
+            state=NEW_STATE,
+            commit=True,
+            session=session,
+        )
+
+    @mock.patch('airflow.models.dag.DAG.set_task_instance_state')
+    def test_should_not_call_mocked_api_for_dry_run(self, mock_set_task_instance_state, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+        mock_set_task_instance_state.return_value = session.query(TaskInstance).get(
+            {
+                "task_id": "print_the_context",
+                "dag_id": "example_python_operator",
+                "run_id": "TEST_DAG_RUN_ID",
+                "map_index": -1,
+            }
+        )
+        response = self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": True,
+                "new_state": NEW_STATE,
+            },
+        )
+        assert response.status_code == 200
+        print(response.status_code)
+        assert response.json == {
+            'dag_id': 'example_python_operator',
+            'dag_run_id': 'TEST_DAG_RUN_ID',
+            'execution_date': '2020-01-01T00:00:00+00:00',
+            'task_id': 'print_the_context',
+        }
+
+        mock_set_task_instance_state.assert_not_called()
+
+    def test_should_update_task_instance_state(self, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+
+        self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "new_state": NEW_STATE,
+            },
+        )
+
+        response2 = self.client.get(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={},
+        )
+        assert response2.status_code == 200
+        assert response2.json["state"] == NEW_STATE
+
+    def test_should_update_mapped_task_instance_state(self, session):
+        tis = self.create_task_instances(session)
+        session.query()
+        ti = tis[0]
+        ti.map_index = 1
+        rendered_fields = RTIF(ti, render_templates=False)
+        session.add(rendered_fields)
+        session.commit()
+
+        NEW_STATE = "failed"
+
+        self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "map_index": 1,
+                "new_state": NEW_STATE,
+            },
+        )
+
+        response2 = self.client.get(
+            f"{self.ENDPOINT_URL}/1",
+            environ_overrides={'REMOTE_USER': "test"},
+            json={},
+        )

Review Comment:
   What's the update on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r965122705


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +543,87 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_mapped_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task_id = data['task_id']
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if ti is None:
+        error_message = f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti.set_state(data["new_state"], session=session)
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION) -> APIResponse:

Review Comment:
   Yeah tks @uranusjr   I'll change it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1255172700

   FYI. TP was on vacations - but before he gets here, I think if you reabase and fix failing static checks  @cedkoffeto 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r996742047


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,100 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound("DAG not found", detail=f"DAG {dag_id!r} not found")
+
+    if not dag.has_task(task_id):
+        raise NotFound("Task not found", detail=f"Task {task_id!r} not found in DAG {dag_id!r}")
+
+    ti: TI | None = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if not ti:
+        error_message = (
+            f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        )
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti = dag.set_task_instance_state(
+            task_id=task_id,
+            run_id=dag_run_id,
+            map_indexes=[map_index],
+            state=data["new_state"],
+            commit=True,
+            session=session,
+        )
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session

Review Comment:
   You’re correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r978369587


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +545,103 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if ti is None:
+        error_message = f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)

Review Comment:
   We don’t really use the ti here so it’s probably better to use `count()` instead e.g.
   
   ```python
   ti_count = session.query(func.count()).filter(TI.task_id == task_id, ...).scalar()
   if not ti_count:
       raise NotFound(...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1265786871

   Hi @Bowrna  year I will fix them soon I was a little busy these days 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r1006568593


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,51 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    map_index = data['map_index']

Review Comment:
   Would it be possible to put `map_index` in the URL instead of the post body? It is a bit inconsistent to have the `map_index`-in-URL variant for GET but not for PATCH.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r1001845249


##########
tests/api_connexion/endpoints/test_task_instance_endpoint.py:
##########
@@ -1582,3 +1582,226 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se
         )
         assert response.status_code == 400
         assert response.json['detail'] == expected
+
+
+class TestPatchTaskInstance(TestTaskInstanceEndpoint):
+    ENDPOINT_URL = (
+        "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+    )
+
+    @mock.patch('airflow.models.dag.DAG.set_task_instance_state')
+    def test_should_call_mocked_api(self, mock_set_task_instance_state, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+        mock_set_task_instance_state.return_value = session.query(TaskInstance).get(
+            {
+                "task_id": "print_the_context",
+                "dag_id": "example_python_operator",
+                "run_id": "TEST_DAG_RUN_ID",
+                "map_index": -1,
+            }
+        )
+        response = self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "new_state": NEW_STATE,
+            },
+        )
+        assert response.status_code == 200
+        assert response.json == {
+            'dag_id': 'example_python_operator',
+            'dag_run_id': 'TEST_DAG_RUN_ID',
+            'execution_date': '2020-01-01T00:00:00+00:00',
+            'task_id': 'print_the_context',
+        }
+
+        mock_set_task_instance_state.assert_called_once_with(
+            task_id="print_the_context",
+            run_id="TEST_DAG_RUN_ID",
+            map_indexes=[-1],
+            state=NEW_STATE,
+            commit=True,
+            session=session,
+        )
+
+    @mock.patch('airflow.models.dag.DAG.set_task_instance_state')
+    def test_should_not_call_mocked_api_for_dry_run(self, mock_set_task_instance_state, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+        mock_set_task_instance_state.return_value = session.query(TaskInstance).get(
+            {
+                "task_id": "print_the_context",
+                "dag_id": "example_python_operator",
+                "run_id": "TEST_DAG_RUN_ID",
+                "map_index": -1,
+            }
+        )
+        response = self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": True,
+                "new_state": NEW_STATE,
+            },
+        )
+        assert response.status_code == 200
+        print(response.status_code)
+        assert response.json == {
+            'dag_id': 'example_python_operator',
+            'dag_run_id': 'TEST_DAG_RUN_ID',
+            'execution_date': '2020-01-01T00:00:00+00:00',
+            'task_id': 'print_the_context',
+        }
+
+        mock_set_task_instance_state.assert_not_called()
+
+    def test_should_update_task_instance_state(self, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+
+        self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "new_state": NEW_STATE,
+            },
+        )
+
+        response2 = self.client.get(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={},
+        )
+        assert response2.status_code == 200
+        assert response2.json["state"] == NEW_STATE
+
+    def test_should_update_mapped_task_instance_state(self, session):
+        tis = self.create_task_instances(session)
+        session.query()

Review Comment:
   Why did we make this empty query?



##########
tests/api_connexion/endpoints/test_task_instance_endpoint.py:
##########
@@ -1582,3 +1582,226 @@ def test_should_raise_400_for_naive_and_bad_datetime(self, payload, expected, se
         )
         assert response.status_code == 400
         assert response.json['detail'] == expected
+
+
+class TestPatchTaskInstance(TestTaskInstanceEndpoint):
+    ENDPOINT_URL = (
+        "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context"
+    )
+
+    @mock.patch('airflow.models.dag.DAG.set_task_instance_state')
+    def test_should_call_mocked_api(self, mock_set_task_instance_state, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+        mock_set_task_instance_state.return_value = session.query(TaskInstance).get(
+            {
+                "task_id": "print_the_context",
+                "dag_id": "example_python_operator",
+                "run_id": "TEST_DAG_RUN_ID",
+                "map_index": -1,
+            }
+        )
+        response = self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "new_state": NEW_STATE,
+            },
+        )
+        assert response.status_code == 200
+        assert response.json == {
+            'dag_id': 'example_python_operator',
+            'dag_run_id': 'TEST_DAG_RUN_ID',
+            'execution_date': '2020-01-01T00:00:00+00:00',
+            'task_id': 'print_the_context',
+        }
+
+        mock_set_task_instance_state.assert_called_once_with(
+            task_id="print_the_context",
+            run_id="TEST_DAG_RUN_ID",
+            map_indexes=[-1],
+            state=NEW_STATE,
+            commit=True,
+            session=session,
+        )
+
+    @mock.patch('airflow.models.dag.DAG.set_task_instance_state')
+    def test_should_not_call_mocked_api_for_dry_run(self, mock_set_task_instance_state, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+        mock_set_task_instance_state.return_value = session.query(TaskInstance).get(
+            {
+                "task_id": "print_the_context",
+                "dag_id": "example_python_operator",
+                "run_id": "TEST_DAG_RUN_ID",
+                "map_index": -1,
+            }
+        )
+        response = self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": True,
+                "new_state": NEW_STATE,
+            },
+        )
+        assert response.status_code == 200
+        print(response.status_code)
+        assert response.json == {
+            'dag_id': 'example_python_operator',
+            'dag_run_id': 'TEST_DAG_RUN_ID',
+            'execution_date': '2020-01-01T00:00:00+00:00',
+            'task_id': 'print_the_context',
+        }
+
+        mock_set_task_instance_state.assert_not_called()
+
+    def test_should_update_task_instance_state(self, session):
+        self.create_task_instances(session)
+
+        NEW_STATE = "failed"
+
+        self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "new_state": NEW_STATE,
+            },
+        )
+
+        response2 = self.client.get(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={},
+        )
+        assert response2.status_code == 200
+        assert response2.json["state"] == NEW_STATE
+
+    def test_should_update_mapped_task_instance_state(self, session):
+        tis = self.create_task_instances(session)
+        session.query()
+        ti = tis[0]
+        ti.map_index = 1
+        rendered_fields = RTIF(ti, render_templates=False)
+        session.add(rendered_fields)
+        session.commit()
+
+        NEW_STATE = "failed"
+
+        self.client.patch(
+            self.ENDPOINT_URL,
+            environ_overrides={'REMOTE_USER': "test"},
+            json={
+                "dry_run": False,
+                "map_index": 1,
+                "new_state": NEW_STATE,
+            },
+        )
+
+        response2 = self.client.get(
+            f"{self.ENDPOINT_URL}/1",
+            environ_overrides={'REMOTE_USER': "test"},
+            json={},
+        )

Review Comment:
   Is this second request necessary? If it is, what's the meaning of the `/1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r996485300


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +545,103 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if ti is None:
+        error_message = f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)

Review Comment:
   Hi @uranusjr 🤔 for this part I return the requested ti (without updating) in the case of a dry run that's why am not just counting. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1250073446

   Hi, @uranusjr can you check my changes, please? thank you.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
Bowrna commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1265699979

   there are few static checks failing in the CI pipeline too @cedkoffeto 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r999745139


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -4424,6 +4477,14 @@ components:
       required: true
       description: The DAG run ID.
 
+    TaskInstanceID:

Review Comment:
   It' s not am removing it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1284621075

   @ephraimbuddy plz can we close this PR or should I request a second review from @uranusjr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r996552621


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,102 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    if not dag.has_task(task_id):
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if not ti:
+        error_message = (
+            f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        )
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti = dag.set_task_instance_state(
+            task_id=task_id,
+            run_id=dag_run_id,
+            map_indexes=[map_index],
+            state=data["new_state"],
+            commit=not data["dry_run"],
+            session=session,
+        )

Review Comment:
   ```suggestion
           ti = dag.set_task_instance_state(
               task_id=task_id,
               run_id=dag_run_id,
               map_indexes=[map_index],
               state=data["new_state"],
               commit=True,
               session=session,
           )
   ```



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,102 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)

Review Comment:
   ```suggestion
       dag = get_airflow_app().dag_bag.get_dag(dag_id)
       if not dag:
           raise NotFound("DAG not found", detail=f"DAG {dag_id!r} not found")
   ```



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,102 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    if not dag.has_task(task_id):
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )

Review Comment:
   ```suggestion
       ti: TI | None = session.query(TI).get(
           {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
       )
   ```



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,102 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    if not dag.has_task(task_id):
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)

Review Comment:
   ```suggestion
       if not dag.has_task(task_id):
           raise NotFound("Task not found", detail=f"Task {task_id!r} not found in DAG {dag_id!r}")
   ```



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,102 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    if not dag.has_task(task_id):
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if not ti:
+        error_message = (
+            f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        )
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti = dag.set_task_instance_state(
+            task_id=task_id,
+            run_id=dag_run_id,
+            map_indexes=[map_index],
+            state=data["new_state"],
+            commit=not data["dry_run"],
+            session=session,
+        )
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION
+) -> APIResponse:

Review Comment:
   This and `patch_mapped_task_instance` are too similar that they can simply be one single endpoint. I also wonder if we should have an endpoint (either this one directly or a separate one) to clear _all task instances of one mapped task_.
   
   (Also the two endpoint declarations in OpenAPI can also be de-duplicated a bit with components.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1296562079

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1295935491

   Need to fix the static checks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
uranusjr commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1240044768

   This also needs some tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r996695477


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,102 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    if not dag.has_task(task_id):
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if not ti:
+        error_message = (
+            f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        )
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti = dag.set_task_instance_state(
+            task_id=task_id,
+            run_id=dag_run_id,
+            map_indexes=[map_index],
+            state=data["new_state"],
+            commit=not data["dry_run"],
+            session=session,
+        )
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION
+) -> APIResponse:

Review Comment:
   Yes you're right @uranusjr we can de-duplicate them. 
   And for updating all mapped task instances I think we can also keep this same endpoint.
   And add map_indexes parameter for updating a subset of mapped task instances. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r997353975


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,100 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""

Review Comment:
   Thanks for feedback @ephraimbuddy  There is an other endpoint for the updating a simple task instance but @uranusjr suggested to de-duplicate them so am going to update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r996669725


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,100 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION

Review Comment:
   Should map_index default to -1?



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,100 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""

Review Comment:
   ```suggestion
       """Update the state of a task instance."""
   ```
   I think we shouldn't make this only for mapped task instance. Why do you think it should only be for mapped task?



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,100 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound("DAG not found", detail=f"DAG {dag_id!r} not found")
+
+    if not dag.has_task(task_id):
+        raise NotFound("Task not found", detail=f"Task {task_id!r} not found in DAG {dag_id!r}")
+
+    ti: TI | None = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if not ti:
+        error_message = (
+            f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        )
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti = dag.set_task_instance_state(
+            task_id=task_id,
+            run_id=dag_run_id,
+            map_indexes=[map_index],
+            state=data["new_state"],
+            commit=True,
+            session=session,
+        )
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session

Review Comment:
   Defaulting the map_index to -1 as I suggested should make the other one work for mapped tasks as well as ordinary tasks or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoo commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoo commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r997352885


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,100 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_mapped_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""

Review Comment:
   Thanks for feedback @ephraimbuddy  There is an other endpoint for the updating a simple task instance but @uranusjr suggested to de-duplicate them so am going to update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r999646428


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -1144,6 +1144,35 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+    patch:
+      summary: Updates the state of a task instance
+      description: >
+        Updates the state for single task instance.

Review Comment:
   ```suggestion
           Updates the state for single task instance.
           
           *New in version 2.5.0*
   ```



##########
airflow/api_connexion/schemas/task_instance_schema.py:
##########
@@ -166,6 +166,14 @@ def validate_form(self, data, **kwargs):
             raise ValidationError("Exactly one of execution_date or dag_run_id must be provided")
 
 
+class SetSingleTaskInstanceStateFormSchema(Schema):
+    """Schema for handling the request of updating state of a single task instance"""
+
+    dry_run = fields.Boolean(dump_default=True)
+    map_index = fields.Int(dump_default=-1)

Review Comment:
   ```suggestion
       map_index = fields.Int(load_default=-1)
   ```



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -4424,6 +4477,14 @@ components:
       required: true
       description: The DAG run ID.
 
+    TaskInstanceID:

Review Comment:
   Is this used anywhere? 



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,51 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    map_index = -1 if "map_index" not in data else data.get("map_index")

Review Comment:
   ```suggestion
       map_index = data['map_index']
   ```
   Any reason why we are not defaulting the map_index to -1 in the schema?



##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -545,3 +547,51 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_task_instance(
+    *, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION
+) -> APIResponse:
+    """Update the state of a mapped task instance."""

Review Comment:
   ```suggestion
       """Update the state of a task instance."""
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1237341841

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on PR #26165:
URL: https://github.com/apache/airflow/pull/26165#issuecomment-1237493107

   Hi @mik-laj @ephraimbuddy 
   Am writing tests but I just wanted your feedback about adding patch method to the specified endpoints   (does it seems to be the best way for you )    
   Tks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] cedkoffeto commented on a diff in pull request #26165: Api endpoint update ti

Posted by GitBox <gi...@apache.org>.
cedkoffeto commented on code in PR #26165:
URL: https://github.com/apache/airflow/pull/26165#discussion_r965122705


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -543,3 +543,87 @@ def post_set_task_instances_state(*, dag_id: str, session: Session = NEW_SESSION
         session=session,
     )
     return task_instance_reference_collection_schema.dump(TaskInstanceReferenceCollection(task_instances=tis))
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_mapped_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, map_index: int, session: Session = NEW_SESSION) -> APIResponse:
+    """Update the state of a mapped task instance."""
+    body = get_json_request_dict()
+    try:
+        data = set_single_task_instance_state_form.load(body)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+
+    error_message = f"Dag ID {dag_id} not found"
+    dag = get_airflow_app().dag_bag.get_dag(dag_id)
+    if not dag:
+        raise NotFound(error_message)
+
+    task_id = data['task_id']
+    task = dag.task_dict.get(task_id)
+
+    if not task:
+        error_message = f"Task ID {task_id} not found"
+        raise NotFound(error_message)
+
+
+    ti: TI = session.query(TI).get(
+        {'task_id': task_id, 'dag_id': dag_id, 'run_id': dag_run_id, 'map_index': map_index}
+    )
+
+    if ti is None:
+        error_message = f"Mapped task instance not found for task {task_id!r} on DAG run with ID {dag_run_id!r}"
+        raise NotFound(detail=error_message)
+
+    if not data["dry_run"]:
+        ti.set_state(data["new_state"], session=session)
+
+    return task_instance_reference_schema.dump(ti)
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def patch_set_task_instance_state(*, dag_id: str, dag_run_id: str, task_id: str, session: Session = NEW_SESSION) -> APIResponse:

Review Comment:
   yes good point I'll change it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org