You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/26 00:44:20 UTC

[GitHub] [airflow] bbenshalom opened a new pull request #17839: Add DAG run abort endpoint

bbenshalom opened a new pull request #17839:
URL: https://github.com/apache/airflow/pull/17839


   Fixes #15888 
   
   The new endpoint will set the DAG run state and all of the task instances' state to FAILED
   I couldn't find any tests for the endpoints directly, so did not add a unit test... Please let me know if I missed their location and I'll happily add them!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r698324454



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,12 +104,10 @@ def autofill(self, data, **kwargs):
         return data
 
 
-class SetDagRunStateFormSchema(Schema):
+class SetDagRunStateFormSchema(SQLAlchemySchema):

Review comment:
       I think this should be `Schema`; this is not a SQLAlchemy model.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700016667



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,41 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Set a state of DAG run
+      description: >
+        Set a state of DAG run

Review comment:
       ```suggestion
         description: Set a state of DAG run
   ```
   
   (Nitpicking) don’t need to extra wrap.

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1156,111 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @freeze_time(TestDagRunEndpoint.default_time)
+    def test_should_respond_200(self, state):
+        dag_id = "TEST_DAG_ID"
+        dag_run_id = "TEST_DAG_RUN_ID_1"
+        test_time = timezone.parse(self.default_time)
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = DagRun(
+                dag_id=dag_id,
+                run_id=dag_run_id,
+                state=DagRunState.RUNNING,
+                run_type=DagRunType.MANUAL,
+                execution_date=test_time,
+                start_date=test_time,
+                external_trigger=True,
+            )
+            session.add(dag)
+            session.add(dag_run)

Review comment:
       It’s probably easier to use the `dag_maker` fixture for this (but the current implementation is OK).

##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +105,12 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):
+    """Schema for handling the request of setting state of DAG run"""
+
+    state = DagStateField(validate=validate.OneOf([State.SUCCESS, State.FAILED]))

Review comment:
       Nitpick: Use `DagRunState.SUCCESS.value` (and `FAILED`) instead.

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -271,3 +275,29 @@ def post_dag_run(dag_id, session):
         )
 
     raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+    ]
+)
+@provide_session
+def update_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:
+    """Set a state of a dag run."""
+    dag_run: Optional[DagRun] = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
+    )
+    if dag_run is None:
+        error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+        raise NotFound(error_message)
+    try:
+        post_body = set_dagrun_state_form_schema.load(request.json)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+
+    state = post_body['state']
+    dag_run.set_state(state=DagRunState(state).value)

Review comment:
       ```suggestion
       dag_run.set_state(state=DagRunState(state))
   ```
   
   `set_state` accepts a `DagRunState`, not `str`.

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1996,6 +2031,16 @@ components:
       required:
         - dag_id
 
+    UpdateDagRunState:
+      type: object
+      properties:
+        state:
+          description: The state to set this DagRun
+          type: string
+          enum:
+            - success
+            - failed

Review comment:
       I wonder if it makes sense to set the state to other valid values. But I guess those can be added later if someone asks for them, so they should not block this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700825889



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,40 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:

Review comment:
       ```suggestion
     /dags/{dag_id}/dagRuns/{dag_run_id}:
       parameters:
         - $ref: '#/components/parameters/DAGID'
         - $ref: '#/components/parameters/DAGRunID'
   
       patch:
   ```
   This endpoint modifies an existing object, so according to the REST convention, we should not create a new endpoint, but change the method type to PATCH.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700507659



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1154,91 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @pytest.fixture(scope="module")
+    def test_should_respond_200(self, state, dag_maker):
+        dag_id = "TEST_DAG_ID"
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = dag_maker.create_dagrun()
+            session.add(dag)
+            session.add(dag_run)

Review comment:
       > How can I inject dag_maker […]?
   
   Just include `dag_maker` as an argument. Pytest magically picks the argument up and injects the fixture into the function. But to make the test compatible with pytest, you need to use `pytest.mark.parametrize` instead of `parameterized.expand`.
   
   So something like
   
   ```python
   @pytest.mark.parametrize("state", [...])
   def test_should_respond_200(self, dag_maker, state):
       ...
   ```
   
   > What is the purpose of DummyOperator here, and why run in separately in a context manager from `dag_maker.create_dagrun()`?
   
   The DummyOperator will be included as a part of the DAG, similar to
   
   ```python
   with DAG(...):
       MyOperator(...)
   ```
   
   It’s not actually used anywhere in the test, so technically you can just do
   
   ```python
   with dag_maker(...):
       pass
   ```
   
   A DAG without any operators is kind of weird, but allowed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r697676120



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -281,14 +281,14 @@ def post_dag_run(dag_id, session):
     ]
 )
 @provide_session
-def post_set_dag_run_state(dag_id, session) -> dict:
+def post_set_dag_run_state(dag_id: str, session) -> dict:
     """Set a state of a dag run."""
     try:
         post_body = dagrun_schema.load(request.json, session=session, unknown="include")
     except ValidationError as err:
         raise BadRequest(detail=str(err))
     dag_run_id, state = post_body['run_id'], post_body['state']
-    dag_run = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
+    dag_run: DagRun = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()

Review comment:
       ```suggestion
       dag_run: Optional[DagRun] = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700901586



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +105,12 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):

Review comment:
       considering @uranusjr 's comment, I'll resolve this conversation without changing the super class here (let me know if you still want it to be done)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r697918061



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +104,14 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):
+    """Schema for handling the request of setting state of DAG run"""
+
+    run_id = auto_field(data_key='dag_run_id')
+    dag_id = auto_field(dump_only=True)
+    state = DagStateField(dump_only=True)

Review comment:
       ```suggestion
   class SetDagRunStateFormSchema(Schema):
       """Schema for handling the request of setting state of DAG run"""
   
       state = DagStateField()
   ```
   
   We don’t have these fields in the schema now. Also `dump_only` means this field cannot be include in a POST form, which is opposite to what you want. Technically you can add `load_only=True`, but since this entire schema is only used to load data anyway, the flag would not practically change anything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700829408



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +105,12 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):

Review comment:
       Can we use DAGRunSchema schema here? I think it is possible, but we need to carefully review which fields are marked as input and which are marked as output.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r698323760



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -278,19 +280,24 @@ def post_dag_run(dag_id, session):
 @security.requires_access(
     [
         (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN)
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
     ]
 )
 @provide_session
 def post_set_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:
     """Set a state of a dag run."""
-    dag_run: Optional[DagRun] = session.query(DagRun) \
-        .filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
+    dag_run: Optional[DagRun] = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
+    )
     if dag_run is None:
         error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
         raise NotFound(error_message)
+    try:
+        post_body = set_dagrun_state_form_schema.load(request.json, session=session)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
 
-    state = request.json['state']
-    dag_run.set_state(state=DagRunState(state.lower()).value)
+    state = post_body['state']
+    dag_run.set_state(state=state)

Review comment:
       I think this still needs a conversion
   
   ```suggestion
       dag_run.set_state(state=DagRunState(state))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700873170



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,40 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Set a state of DAG run
+      description: Set a state of DAG run
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
+      operationId: update_dag_run_state
+      tags: [UpdateDagRunState]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/UpdateDagRunState'

Review comment:
       It will give room to updating the state to any value, and the state on DAGRun is readonly, making it accept a value will lead to setting the DAGrun to running in the API




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-911993338


   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700827968



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,40 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Set a state of DAG run
+      description: Set a state of DAG run
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
+      operationId: update_dag_run_state
+      tags: [UpdateDagRunState]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/UpdateDagRunState'

Review comment:
       Should we accept DAGRun here similar to `PATCH /dags/{dag_id}`?
   https://github.com/apache/airflow/blob/fb11ba5c42c2547e6550480eca1c5db14ea76f99/airflow/api_connexion/openapi/v1.yaml#L466




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r697868482



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,40 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/updateDagRunState:

Review comment:
       This endpoint feels wrong to me. Since the operation only affects one single DagRun, I feel it should be something like `/dags/{dag_id}/dagRuns/{dag_run_id}/state`. And it should only take a single `state` field (not `dag_id` or `dag_run_id`).

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -271,3 +273,33 @@ def post_dag_run(dag_id, session):
         )
 
     raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN)
+    ]
+)
+@provide_session
+def post_set_dag_run_state(dag_id: str, session) -> dict:
+    """Set a state of a dag run."""
+    try:
+        post_body = dagrun_schema.load(request.json, session=session, unknown="include")
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    dag_run_id, state = post_body['run_id'], post_body['state']
+    dag_run: Optional[DagRun] = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()

Review comment:
       This should happen before loading `post_body`. If a DAG does not exist, the endpoint should always fail with 404.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700055888



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1996,6 +2031,16 @@ components:
       required:
         - dag_id
 
+    UpdateDagRunState:
+      type: object
+      properties:
+        state:
+          description: The state to set this DagRun
+          type: string
+          enum:
+            - success
+            - failed

Review comment:
       The other values are `queued` and `running` which are handled by SchedulerJob, so I think it's OK and I agree with you, we should leave it until the need arises




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r697898126



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -271,3 +273,27 @@ def post_dag_run(dag_id, session):
         )
 
     raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN)
+    ]
+)
+@provide_session
+def post_set_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:
+    """Set a state of a dag run."""
+    try:
+        post_body = dagrun_schema.load(request.json, session=session, unknown="include")
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+    dag_run: Optional[DagRun] = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
+    if dag_run is None:
+        error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+        raise DagRunNotFound(error_message)

Review comment:
       ```suggestion
       dag_run: Optional[DagRun] = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
       if dag_run is None:
           error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
           raise DagRunNotFound(error_message)
       try:
           post_body = dagrun_schema.load(request.json, session=session)
       except ValidationError as err:
           raise BadRequest(detail=str(err))
   ```
   
   Also, is the `unknown="include"` argument needed and why?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700536752



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1154,91 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @pytest.fixture(scope="module")
+    def test_should_respond_200(self, state, dag_maker):
+        dag_id = "TEST_DAG_ID"
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = dag_maker.create_dagrun()
+            session.add(dag)
+            session.add(dag_run)

Review comment:
       Thanks for the explanation! Makes sense now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-906830420


   @ephraimbuddy I changed the endpoint to set a DAG run state and added the appropriate tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-911563597


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700916317



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -674,6 +674,35 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+    patch:
+      summary: Set a state of DAG run
+      description: Set a state of DAG run

Review comment:
       ```suggestion
         summary: Modify a DAG run
         description: Modify a DAG run
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700833831



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +105,12 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):

Review comment:
       It should be possible, but I’d say let’s not bother until someone comes up with a feature request 🙂




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700507659



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1154,91 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @pytest.fixture(scope="module")
+    def test_should_respond_200(self, state, dag_maker):
+        dag_id = "TEST_DAG_ID"
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = dag_maker.create_dagrun()
+            session.add(dag)
+            session.add(dag_run)

Review comment:
       > How can I inject dag_maker […]?
   
   Just include `dag_maker` as an argument. Pytest magically picks the argument up and injects the fixture into the function. But to make the test compatible with pytest, you need to use `pytest.mark.parametrize` instead of `parameterized.expand`.
   
   So something like
   
   ```python
   @pytest.mark.parametrize("state", [...])
   def test_should_respond_200(self, dag_maker, state):
       ...
   ```
   
   > What is the purpose of DummyOperator here, and why run in separately in a context manager from `dag_maker.create_dagrun()`?
   
   The DummyOperator will be included as a part of the DAG, similar to
   
   ```python
   with DAG(...):
       MyOperator(...)
   
   # The above is equivalent to
   # dag = DAG(...)
   # MyOperator(..., dag=dag)
   ```
   
   It’s not actually used anywhere in the test, so technically you can just do
   
   ```python
   with dag_maker(...):
       pass
   ```
   
   A DAG without any operators is kind of weird, but allowed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700835891



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +105,12 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):

Review comment:
       Using `PATCH` is a good idea though. I was leaning toward `POST` with the impression `dagrun.state = new_state` does not work, but I was wrong, it actually implicitly calls `set_state`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-905972040


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700900840



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,40 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Set a state of DAG run
+      description: Set a state of DAG run
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
+      operationId: update_dag_run_state
+      tags: [UpdateDagRunState]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/UpdateDagRunState'

Review comment:
       considering @ephraimbuddy 's comment, I'll resolve this conversation without changing the input schema




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700898490



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,40 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700493254



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1154,91 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @pytest.fixture(scope="module")
+    def test_should_respond_200(self, state, dag_maker):
+        dag_id = "TEST_DAG_ID"
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = dag_maker.create_dagrun()
+            session.add(dag)
+            session.add(dag_run)

Review comment:
       @ephraimbuddy I can't get this setup to work and don't really understand it, I guess, so a couple of questions:
   1. How can I inject dag_maker without using `@pytest.fixture`, if I'm also using `@parameterized.expand`? I get an error that I'm missing an error `missing 1 required positional argument: 'dag_maker'`
   2. What is the purpose of DummyOperator here, and why run in separately in a context manager from `dag_maker.create_dagrun()`?
   I'm considering reverting the usage of this and just creating the DAG using the constructor, like the other tests in this file...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-907690208


   > I removed the `dagrun_schema.load()` part, as I think I don't actually need it, since I already query the dag run and merge it... correct me if I'm wrong
   
   The `load` call performs data sanitation and automatically emits a 400 Bad Request if the user POSTs an invalid state name (say `{"state": "asdfg"}`). You need to keep it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-911635895


   @uranusjr @ephraimbuddy thank you for the review! Can you please approve the workflow? And since I don't have write access, I guess this will be merged by one of the maintainers when appropriate, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-911437241


   @ephraimbuddy any additional requested changes I missed? I see that a request from you is blocking the option to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r697925217



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +104,14 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):
+    """Schema for handling the request of setting state of DAG run"""
+
+    run_id = auto_field(data_key='dag_run_id')
+    dag_id = auto_field(dump_only=True)
+    state = DagStateField(dump_only=True)

Review comment:
       > Also you will still need to satisfy the linter. Again, please consider setting up `pre-commit`.
   
   ran the static checks locally, I think it should be good now (for some reason they don't run automatically here in the GitHub actions), thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700826744



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,40 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:

Review comment:
       Similar to:
   https://github.com/apache/airflow/blob/fb11ba5c42c2547e6550480eca1c5db14ea76f99/airflow/api_connexion/openapi/v1.yaml#L325-L353
   https://github.com/apache/airflow/blob/fb11ba5c42c2547e6550480eca1c5db14ea76f99/airflow/api_connexion/openapi/v1.yaml#L454-L479




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alex-astronomer commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
alex-astronomer commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-922517598


   Could use more documentation about how to use this endpoint.  Not sure if that exists anywhere, I couldn't find in the `v1.yaml` file.  Had to dig into source code/guess how to use it, maybe I'm just a beginner but I think adding some documentation could go a long way for other beginners.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r697925217



##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +104,14 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):
+    """Schema for handling the request of setting state of DAG run"""
+
+    run_id = auto_field(data_key='dag_run_id')
+    dag_id = auto_field(dump_only=True)
+    state = DagStateField(dump_only=True)

Review comment:
       > Also you will still need to satisfy the linter. Again, please consider setting up `pre-commit`.
   
   run the static checks locally, I think it should be good now (for some reason they don't run automatically here in the GitHub actions), thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #17839:
URL: https://github.com/apache/airflow/pull/17839


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700016667



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,41 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Set a state of DAG run
+      description: >
+        Set a state of DAG run

Review comment:
       ```suggestion
         description: Set a state of DAG run
   ```
   
   (Nitpicking) don’t need to extra wrap.

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1156,111 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @freeze_time(TestDagRunEndpoint.default_time)
+    def test_should_respond_200(self, state):
+        dag_id = "TEST_DAG_ID"
+        dag_run_id = "TEST_DAG_RUN_ID_1"
+        test_time = timezone.parse(self.default_time)
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = DagRun(
+                dag_id=dag_id,
+                run_id=dag_run_id,
+                state=DagRunState.RUNNING,
+                run_type=DagRunType.MANUAL,
+                execution_date=test_time,
+                start_date=test_time,
+                external_trigger=True,
+            )
+            session.add(dag)
+            session.add(dag_run)

Review comment:
       It’s probably easier to use the `dag_maker` fixture for this (but the current implementation is OK).

##########
File path: airflow/api_connexion/schemas/dag_run_schema.py
##########
@@ -104,6 +105,12 @@ def autofill(self, data, **kwargs):
         return data
 
 
+class SetDagRunStateFormSchema(Schema):
+    """Schema for handling the request of setting state of DAG run"""
+
+    state = DagStateField(validate=validate.OneOf([State.SUCCESS, State.FAILED]))

Review comment:
       Nitpick: Use `DagRunState.SUCCESS.value` (and `FAILED`) instead.

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -271,3 +275,29 @@ def post_dag_run(dag_id, session):
         )
 
     raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+    ]
+)
+@provide_session
+def update_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:
+    """Set a state of a dag run."""
+    dag_run: Optional[DagRun] = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id).one_or_none()
+    )
+    if dag_run is None:
+        error_message = f'Dag Run id {dag_run_id} not found in dag {dag_id}'
+        raise NotFound(error_message)
+    try:
+        post_body = set_dagrun_state_form_schema.load(request.json)
+    except ValidationError as err:
+        raise BadRequest(detail=str(err))
+
+    state = post_body['state']
+    dag_run.set_state(state=DagRunState(state).value)

Review comment:
       ```suggestion
       dag_run.set_state(state=DagRunState(state))
   ```
   
   `set_state` accepts a `DagRunState`, not `str`.

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1996,6 +2031,16 @@ components:
       required:
         - dag_id
 
+    UpdateDagRunState:
+      type: object
+      properties:
+        state:
+          description: The state to set this DagRun
+          type: string
+          enum:
+            - success
+            - failed

Review comment:
       I wonder if it makes sense to set the state to other valid values. But I guess those can be added later if someone asks for them, so they should not block this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r700356792



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1154,91 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @pytest.fixture(scope="module")
+    def test_should_respond_200(self, state, dag_maker):
+        dag_id = "TEST_DAG_ID"
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = dag_maker.create_dagrun()
+            session.add(dag)
+            session.add(dag_run)
+
+        request_json = {"state": state}
+
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1/state",
+            json=request_json,
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+
+        assert response.status_code == 200
+        assert response.json == {
+            'conf': {},
+            'dag_id': 'TEST_DAG_ID',
+            'dag_run_id': 'TEST_DAG_RUN_ID_1',

Review comment:
       ```suggestion
               'dag_id': dag_id,
               'dag_run_id': dag_run_id,
   ```

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1154,91 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @pytest.fixture(scope="module")
+    def test_should_respond_200(self, state, dag_maker):
+        dag_id = "TEST_DAG_ID"
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = dag_maker.create_dagrun()
+            session.add(dag)
+            session.add(dag_run)
+
+        request_json = {"state": state}
+
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1/state",

Review comment:
       ```suggestion
               f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/state",
   ```

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1154,91 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand([("failed",), ("success",)])
+    @pytest.fixture(scope="module")
+    def test_should_respond_200(self, state, dag_maker):
+        dag_id = "TEST_DAG_ID"
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = dag_maker.create_dagrun()
+            session.add(dag)
+            session.add(dag_run)

Review comment:
       ```suggestion
       def test_should_respond_200(self, state, dag_maker):
           dag_id = "TEST_DAG_ID"
           dag_run_id = 'TEST_DAG_RUN_ID'
           with dag_maker(dag_id):
               DummyOperator(task_id='task_id')
           dag_maker.create_dagrun(run_id=dag_run_id)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-907625162


   Also I think the linter will reject your code style. Please considering setting up pre-commit and check for the formatting issues locally; see `CONTRIBUTING.rst` for details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] bbenshalom commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
bbenshalom commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-907681943


   @uranusjr I removed the dagrun_schema.load() part, as I think I don't actually need it, since I already query the dag run and merge it... correct me if I'm wrong


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r698295716



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1996,6 +2031,14 @@ components:
       required:
         - dag_id
 
+    UpdateDagRunState:
+      type: object
+      properties:
+        state:
+          $ref: '#/components/schemas/DagState'

Review comment:
       ```suggestion
             description: The state to set this DagRun
             type: string
             enum:
                 - failed
                 - success
   ```
   Instead of using the DagState, we should use enums to avoid setting DagRun state to running or queued which are DagStates

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -271,3 +275,29 @@ def post_dag_run(dag_id, session):
         )
 
     raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+    ]
+)
+@provide_session
+def post_set_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:

Review comment:
       ```suggestion
   def update_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:
   ```

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1156,109 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand(
+        [("TEST_DAG_ID", "TEST_DAG_RUN_ID_1", "failed"), ("TEST_DAG_ID", "TEST_DAG_RUN_ID_1", "success")]
+    )
+    @freeze_time(TestDagRunEndpoint.default_time)
+    def test_should_respond_200(self, dag_id, dag_run_id, state):
+        test_time = timezone.parse(self.default_time)
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = DagRun(
+                dag_id=dag_id,
+                run_id=dag_run_id,
+                state=DagRunState.RUNNING,
+                run_type=DagRunType.MANUAL,
+                execution_date=test_time,
+                start_date=test_time,
+                external_trigger=True,
+            )
+            session.add(dag)
+            session.add(dag_run)
+
+        request_json = {"state": state}
+
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1/state",
+            json=request_json,
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+
+        assert response.status_code == 200
+        assert response.json == {
+            'conf': {},
+            'dag_id': 'TEST_DAG_ID',
+            'dag_run_id': 'TEST_DAG_RUN_ID_1',
+            'end_date': self.default_time,
+            'execution_date': self.default_time,
+            'external_trigger': True,
+            'logical_date': self.default_time,
+            'start_date': self.default_time,
+            'state': state,
+        }
+
+    def test_should_response_400_for_non_existing_dag_run_state(self):
+        test_time = timezone.parse(self.default_time)
+        with create_session() as session, freeze_time(self.default_time):
+            dag_id = "TEST_DAG_ID"
+            dag_run_id = "TEST_DAG_RUN_ID_1"
+            dag = DagModel(dag_id=dag_id)
+            dag_run = DagRun(
+                dag_id=dag_id,
+                run_id=dag_run_id,
+                state=DagRunState.RUNNING,
+                run_type=DagRunType.MANUAL,
+                execution_date=test_time,
+                start_date=test_time,
+                external_trigger=True,
+            )
+            session.add(dag)
+            session.add(dag_run)
+
+            request_json = {"state": "madeUpState"}

Review comment:
       We should parametrize this and make sure that DagRuns are not set to `queued` or `running`

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,41 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Set a state of DAG run
+      description: >
+        Set a state of DAG run
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
+      operationId: post_set_dag_run_state

Review comment:
       ```suggestion
         operationId: update_dag_run_state
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alex-astronomer edited a comment on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
alex-astronomer edited a comment on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-922517386


   Also closes: #16412


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] alex-astronomer commented on pull request #17839: Add DAG run abort endpoint

Posted by GitBox <gi...@apache.org>.
alex-astronomer commented on pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#issuecomment-922517386


   Also closes: #17839


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org