You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/06/12 10:33:47 UTC

[GitHub] [airflow] Bowrna opened a new pull request, #23516: clear specific dag run TI

Bowrna opened a new pull request, #23516:
URL: https://github.com/apache/airflow/pull/23516

   closes: #23227
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragement file, named `{pr_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1146168257

   > Yes, we need those. This should act as a replacement for the current endpoint the UI uses.
   
   Then I have to extract only the task_ids( excluding the map_index in tuple) and send that to query and find the upstream or downstream task_ids. Am I right in my understanding? @bbovenzi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1152132646

   > Yes, we'll need all of the following boolean params to send to the query:
   > 
   > include_upstream include_downstream include_future include_past recursive
   
   @bbovenzi instead of using the field `recursive`, the existing api for `clearTaskInstances` have the param `include_subdags` and `include_parentdag`. The older api in views.py use the value in `recursive` param to fill the value for both `include_subdags` and `include_parentdag`. 
   
   Do you still think `recursive` field has to be given in the query param?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r930617885


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -1218,6 +1218,8 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+
+

Review Comment:
   Accidental?



##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3410,6 +3412,32 @@ components:
           description: Set state of DAG runs to RUNNING.
           type: boolean
 
+        dag_run_id:
+          type: string
+          description:  The DagRun ID for this task instance
+          nullable: true
+
+        include_upstream:
+          description: If set to true, upstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_downstream:
+          description: If set to true, downstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_future:
+          description: If set to True, also tasks from future DAG Runs are affected.
+          type: boolean
+          default: false
+
+        include_past:
+          description: If set to True, also tasks from past DAG Runs are affected.
+          type: boolean
+          default: false
+

Review Comment:
   Also you need to add these to the Marshmallow schema (you already added `dag_run_id` but not the others).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r922996589


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -433,10 +433,38 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     if not dag:
         error_message = f"Dag id {dag_id} not found"
         raise NotFound(error_message)
-    reset_dag_runs = data.pop('reset_dag_runs')
-    dry_run = data.pop('dry_run')
-    # We always pass dry_run here, otherwise this would try to confirm on the terminal!
-    task_instances = dag.clear(dry_run=True, dag_bag=current_app.dag_bag, **data)
+    reset_dag_runs = data.pop('reset_dag_runs') == "true"
+    dry_run = data.pop('dry_run') == "true"
+    dag_run_id = data.pop('dag_run_id', None)
+    future = data.pop('include_future', "false") == "true"
+    past = data.pop('include_past', "false") == "true"
+    downstream = data.pop('include_downstream', "false") == "true"
+    upstream = data.pop('include_upstream', "false") == "true"
+    if dag_run_id is not None:
+        dag_run: Optional[DR] = (
+            session.query(DR).filter(DR.dag_id == dag_id, DR.run_id == dag_run_id).one_or_none()
+        )
+        if dag_run is None:
+            error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+            raise NotFound(error_message)
+        data['start_date'] = dag_run.logical_date
+        data['end_date'] = dag_run.logical_date

Review Comment:
   This would cause a `NameError` is `dag_run_id` _is_ None.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1159700154

   @bbovenzi @ephraimbuddy Gentle reminder.
   
    I have added the params in `v1.yaml` but when I hit these params to test the API, I get an error saying Unknown field. Have anyone faced this issue? Can you tell me where I am going wrong?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1205246131

   Could this PR be reviewed? @uranusjr @bbovenzi @ephraimbuddy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r867335567


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -449,6 +450,93 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
         TaskInstanceReferenceCollection(task_instances=task_instances.all())
     )
 
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def post_clear_task_dag_run_instances(
+    *, 
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str, 
+    session: Session = NEW_SESSION,
+    ) -> APIResponse:
+    """Clear task instances for given dag run."""
+    dag = current_app.dag_bag.get_dag(dag_id)

Review Comment:
   @bbovenzi Could you tell me if the checks are ok or do i have to make any changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r936457510


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -434,10 +436,39 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     if not dag:
         error_message = f"Dag id {dag_id} not found"
         raise NotFound(error_message)
+    print('Bowrna:',data)
     reset_dag_runs = data.pop('reset_dag_runs')
     dry_run = data.pop('dry_run')
-    # We always pass dry_run here, otherwise this would try to confirm on the terminal!

Review Comment:
   Need to keep this comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r922996093


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -433,10 +433,38 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     if not dag:
         error_message = f"Dag id {dag_id} not found"
         raise NotFound(error_message)
-    reset_dag_runs = data.pop('reset_dag_runs')
-    dry_run = data.pop('dry_run')
-    # We always pass dry_run here, otherwise this would try to confirm on the terminal!
-    task_instances = dag.clear(dry_run=True, dag_bag=current_app.dag_bag, **data)
+    reset_dag_runs = data.pop('reset_dag_runs') == "true"

Review Comment:
   There is a `to_boolean` function we can use to better parse this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1151404839

   > Then I have to extract only the task_ids( excluding the map_index in tuple) and send that to query and find the upstream or downstream task_ids. Am I right in my understanding? @bbovenzi
   
   Yes, we'll need all of the following boolean params to send to the query:
   
   include_upstream
   include_downstream
   include_future
   include_past
   recursive
    


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r868046251


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -450,6 +451,99 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     )
 
 
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def post_clear_task_dag_run_instances(
+    *,
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    session: Session = NEW_SESSION,
+) -> APIResponse:
+    """Clear task instances for given dag run."""
+    dag = current_app.dag_bag.get_dag(dag_id)
+    if not dag:
+        error_message = f"Dag id {dag_id} not found"
+        raise NotFound(error_message)
+
+    dag_run: Optional[DR] = (
+        session.query(DR).filter(DR.dag_id == dag_id, DR.run_id == dag_run_id).one_or_none()
+    )
+    if dag_run is None:
+        error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+        raise NotFound(error_message)
+
+    ti = (
+        session.query(TI)
+        .filter(
+            TI.task_id == task_id,
+            TI.dag_id == dag_id,
+            TI.run_id == dag_run_id,
+        )
+        .join(TI.dag_run)
+        .one_or_none()
+    )
+    if ti is None:
+        raise NotFound(title="TaskInstance not found")
+
+    try:
+        data = clear_task_instance_dag_run_form.load(request.json)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+    import pdb;pdb.set_trace()
+    reset_dag_runs = data.pop('reset_dag_runs')
+    dry_run = data.pop('dry_run')
+    only_failed = data.pop('only_failed')
+    recursive = data.pop('recursive')
+    map_indexes = data.pop('map_index')
+
+    task_ids: List[Union[str, Tuple[str, int]]]
+    if map_indexes is None:
+        task_ids = [task_id]
+    else:
+        task_ids = [(task_id, map_index) for map_index in map_indexes]
+
+    dag = dag.partial_subset(
+        task_ids_or_regex=[task_id],
+        include_downstream=data['include_downstream'],
+        include_upstream=data['include_upstream'],
+    )
+    if len(dag.task_ids) > 1:
+        # If we had upstream/downstream etc then also include those!
+        task_ids.extend(tid for tid in dag.task_ids if tid != task_id)
+
+    if dry_run:
+        task_instances = dag.clear(
+            start_date=None,
+            end_date=None,
+            task_ids=task_ids,
+            include_subdags=recursive,
+            include_parentdag=recursive,
+            only_failed=only_failed,
+            dry_run=True,
+        )
+        return task_instance_reference_collection_schema.dump(
+        TaskInstanceReferenceCollection(task_instances=task_instances.all())
+        )
+    else:
+        task_instance = dag.clear(
+            start_date=None,
+            end_date=None,
+            task_ids=task_ids,
+            include_subdags=recursive,
+            include_parentdag=recursive,
+            only_failed=only_failed,
+        )

Review Comment:
   ok @ephraimbuddy How do I proceed forward? Can I make changes to the code added in https://github.com/apache/airflow/pull/23451 to accept task_id as param ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1152388565

   > @bbovenzi instead of using the field `recursive`, the existing api for `clearTaskInstances` have the param `include_subdags` and `include_parentdag`. The older api in views.py use the value in `recursive` param to fill the value for both `include_subdags` and `include_parentdag`.
   > 
   > Do you still think `recursive` field has to be given in the query param?
   
   Ah I didn't realize that. We don't need recursive then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r895731512


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3410,6 +3412,32 @@ components:
           description: Set state of DAG runs to RUNNING.
           type: boolean
 
+        dag_run_id:
+          type: string
+          description:  The DagRun ID for this task instance
+          nullable: true
+
+        include_upstream:
+          description: If set to true, upstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_downstream:
+          description: If set to true, downstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_future:
+          description: If set to True, also tasks from future DAG Runs are affected.
+          type: boolean
+          default: false
+
+        include_past:
+          description: If set to True, also tasks from past DAG Runs are affected.
+          type: boolean
+          default: false
+

Review Comment:
   @bbovenzi @ephraimbuddy Could anyone help me in solving this issue? It would be very helpful for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r916922158


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3410,6 +3412,32 @@ components:
           description: Set state of DAG runs to RUNNING.
           type: boolean
 
+        dag_run_id:
+          type: string
+          description:  The DagRun ID for this task instance
+          nullable: true
+
+        include_upstream:
+          description: If set to true, upstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_downstream:
+          description: If set to true, downstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_future:
+          description: If set to True, also tasks from future DAG Runs are affected.
+          type: boolean
+          default: false
+
+        include_past:
+          description: If set to True, also tasks from past DAG Runs are affected.
+          type: boolean
+          default: false
+

Review Comment:
   Sorry, I am not too familiar with this side of things, but I don't seem to see `dry_run` in this file. Could that be it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1144708482

   @bbovenzi @ephraimbuddy Could you share your reviews/ feedback about this PR when you get time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on PR #23516:
URL: https://github.com/apache/airflow/pull/23516#issuecomment-1146000003

   > @bbovenzi @ephraimbuddy Could you share your reviews/ feedback about this PR when you get time?
   > 
   > Also please share if we have to include `include_upstream` and `include_downstream` param as part of this API. The existing API in views.py for `/clear` supports these parameters. That's why I want to know if we have to add it to this API. Thanks!
   
   Yes, we need those. This should act as a replacement for the current endpoint the UI uses. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
potiuk merged PR #23516:
URL: https://github.com/apache/airflow/pull/23516


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r867521752


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -450,6 +451,99 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     )
 
 
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def post_clear_task_dag_run_instances(
+    *,
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    session: Session = NEW_SESSION,
+) -> APIResponse:
+    """Clear task instances for given dag run."""
+    dag = current_app.dag_bag.get_dag(dag_id)
+    if not dag:
+        error_message = f"Dag id {dag_id} not found"
+        raise NotFound(error_message)
+
+    dag_run: Optional[DR] = (
+        session.query(DR).filter(DR.dag_id == dag_id, DR.run_id == dag_run_id).one_or_none()
+    )
+    if dag_run is None:
+        error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+        raise NotFound(error_message)
+
+    ti = (
+        session.query(TI)
+        .filter(
+            TI.task_id == task_id,
+            TI.dag_id == dag_id,
+            TI.run_id == dag_run_id,
+        )
+        .join(TI.dag_run)
+        .one_or_none()
+    )
+    if ti is None:
+        raise NotFound(title="TaskInstance not found")
+
+    try:
+        data = clear_task_instance_dag_run_form.load(request.json)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+    import pdb;pdb.set_trace()
+    reset_dag_runs = data.pop('reset_dag_runs')
+    dry_run = data.pop('dry_run')
+    only_failed = data.pop('only_failed')
+    recursive = data.pop('recursive')
+    map_indexes = data.pop('map_index')
+
+    task_ids: List[Union[str, Tuple[str, int]]]
+    if map_indexes is None:
+        task_ids = [task_id]
+    else:
+        task_ids = [(task_id, map_index) for map_index in map_indexes]
+
+    dag = dag.partial_subset(
+        task_ids_or_regex=[task_id],
+        include_downstream=data['include_downstream'],
+        include_upstream=data['include_upstream'],
+    )
+    if len(dag.task_ids) > 1:
+        # If we had upstream/downstream etc then also include those!
+        task_ids.extend(tid for tid in dag.task_ids if tid != task_id)
+
+    if dry_run:
+        task_instances = dag.clear(
+            start_date=None,
+            end_date=None,
+            task_ids=task_ids,
+            include_subdags=recursive,
+            include_parentdag=recursive,
+            only_failed=only_failed,
+            dry_run=True,
+        )
+        return task_instance_reference_collection_schema.dump(
+        TaskInstanceReferenceCollection(task_instances=task_instances.all())
+        )
+    else:
+        task_instance = dag.clear(
+            start_date=None,
+            end_date=None,
+            task_ids=task_ids,
+            include_subdags=recursive,
+            include_parentdag=recursive,
+            only_failed=only_failed,
+        )

Review Comment:
   @ephraimbuddy Could you tell me what would be the right return data here? In the old code in `views.py`, it returns the number of task instances cleared. Do I have to do a similar return here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r867938334


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -450,6 +451,99 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     )
 
 
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+        (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_TASK_INSTANCE),
+    ],
+)
+@provide_session
+def post_clear_task_dag_run_instances(
+    *,
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    session: Session = NEW_SESSION,
+) -> APIResponse:
+    """Clear task instances for given dag run."""
+    dag = current_app.dag_bag.get_dag(dag_id)
+    if not dag:
+        error_message = f"Dag id {dag_id} not found"
+        raise NotFound(error_message)
+
+    dag_run: Optional[DR] = (
+        session.query(DR).filter(DR.dag_id == dag_id, DR.run_id == dag_run_id).one_or_none()
+    )
+    if dag_run is None:
+        error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+        raise NotFound(error_message)
+
+    ti = (
+        session.query(TI)
+        .filter(
+            TI.task_id == task_id,
+            TI.dag_id == dag_id,
+            TI.run_id == dag_run_id,
+        )
+        .join(TI.dag_run)
+        .one_or_none()
+    )
+    if ti is None:
+        raise NotFound(title="TaskInstance not found")
+
+    try:
+        data = clear_task_instance_dag_run_form.load(request.json)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err.messages))
+    import pdb;pdb.set_trace()
+    reset_dag_runs = data.pop('reset_dag_runs')
+    dry_run = data.pop('dry_run')
+    only_failed = data.pop('only_failed')
+    recursive = data.pop('recursive')
+    map_indexes = data.pop('map_index')
+
+    task_ids: List[Union[str, Tuple[str, int]]]
+    if map_indexes is None:
+        task_ids = [task_id]
+    else:
+        task_ids = [(task_id, map_index) for map_index in map_indexes]
+
+    dag = dag.partial_subset(
+        task_ids_or_regex=[task_id],
+        include_downstream=data['include_downstream'],
+        include_upstream=data['include_upstream'],
+    )
+    if len(dag.task_ids) > 1:
+        # If we had upstream/downstream etc then also include those!
+        task_ids.extend(tid for tid in dag.task_ids if tid != task_id)
+
+    if dry_run:
+        task_instances = dag.clear(
+            start_date=None,
+            end_date=None,
+            task_ids=task_ids,
+            include_subdags=recursive,
+            include_parentdag=recursive,
+            only_failed=only_failed,
+            dry_run=True,
+        )
+        return task_instance_reference_collection_schema.dump(
+        TaskInstanceReferenceCollection(task_instances=task_instances.all())
+        )
+    else:
+        task_instance = dag.clear(
+            start_date=None,
+            end_date=None,
+            task_ids=task_ids,
+            include_subdags=recursive,
+            include_parentdag=recursive,
+            only_failed=only_failed,
+        )

Review Comment:
   See my thoughts in https://github.com/apache/airflow/issues/23227#issuecomment-1120740197



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r924054573


##########
airflow/api_connexion/endpoints/task_instance_endpoint.py:
##########
@@ -433,10 +433,38 @@ def post_clear_task_instances(*, dag_id: str, session: Session = NEW_SESSION) ->
     if not dag:
         error_message = f"Dag id {dag_id} not found"
         raise NotFound(error_message)
-    reset_dag_runs = data.pop('reset_dag_runs')
-    dry_run = data.pop('dry_run')
-    # We always pass dry_run here, otherwise this would try to confirm on the terminal!
-    task_instances = dag.clear(dry_run=True, dag_bag=current_app.dag_bag, **data)
+    reset_dag_runs = data.pop('reset_dag_runs') == "true"

Review Comment:
   i will check this @uranusjr 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna commented on a diff in pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna commented on code in PR #23516:
URL: https://github.com/apache/airflow/pull/23516#discussion_r895147259


##########
airflow/api_connexion/openapi/v1.yaml:
##########
@@ -3410,6 +3412,32 @@ components:
           description: Set state of DAG runs to RUNNING.
           type: boolean
 
+        dag_run_id:
+          type: string
+          description:  The DagRun ID for this task instance
+          nullable: true
+
+        include_upstream:
+          description: If set to true, upstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_downstream:
+          description: If set to true, downstream tasks are also affected.
+          type: boolean
+          default: false
+
+        include_future:
+          description: If set to True, also tasks from future DAG Runs are affected.
+          type: boolean
+          default: false
+
+        include_past:
+          description: If set to True, also tasks from past DAG Runs are affected.
+          type: boolean
+          default: false
+

Review Comment:
   @bbovenzi @ephraimbuddy I have added the params but when I hit these params to test the API, I get an error saying `Unknown field`. Have anyone faced this issue? Can you tell me where I am going wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Bowrna closed pull request #23516: clear specific dag run TI

Posted by GitBox <gi...@apache.org>.
Bowrna closed pull request #23516: clear specific dag run TI
URL: https://github.com/apache/airflow/pull/23516


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org