You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/30 08:31:31 UTC

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #17839: Add DAG run abort endpoint

ephraimbuddy commented on a change in pull request #17839:
URL: https://github.com/apache/airflow/pull/17839#discussion_r698295716



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -1996,6 +2031,14 @@ components:
       required:
         - dag_id
 
+    UpdateDagRunState:
+      type: object
+      properties:
+        state:
+          $ref: '#/components/schemas/DagState'

Review comment:
       ```suggestion
             description: The state to set this DagRun
             type: string
             enum:
                 - failed
                 - success
   ```
   Instead of using the DagState, we should use enums to avoid setting DagRun state to running or queued which are DagStates

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -271,3 +275,29 @@ def post_dag_run(dag_id, session):
         )
 
     raise AlreadyExists(detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ID: '{run_id}' already exists")
+
+
+@security.requires_access(
+    [
+        (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+        (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+    ]
+)
+@provide_session
+def post_set_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:

Review comment:
       ```suggestion
   def update_dag_run_state(dag_id: str, dag_run_id: str, session) -> dict:
   ```

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1154,3 +1156,109 @@ def test_should_raises_403_unauthorized(self):
             environ_overrides={'REMOTE_USER': "test_view_dags"},
         )
         assert response.status_code == 403
+
+
+class TestPostSetDagRunState(TestDagRunEndpoint):
+    @parameterized.expand(
+        [("TEST_DAG_ID", "TEST_DAG_RUN_ID_1", "failed"), ("TEST_DAG_ID", "TEST_DAG_RUN_ID_1", "success")]
+    )
+    @freeze_time(TestDagRunEndpoint.default_time)
+    def test_should_respond_200(self, dag_id, dag_run_id, state):
+        test_time = timezone.parse(self.default_time)
+        with create_session() as session:
+            dag = DagModel(dag_id=dag_id)
+            dag_run = DagRun(
+                dag_id=dag_id,
+                run_id=dag_run_id,
+                state=DagRunState.RUNNING,
+                run_type=DagRunType.MANUAL,
+                execution_date=test_time,
+                start_date=test_time,
+                external_trigger=True,
+            )
+            session.add(dag)
+            session.add(dag_run)
+
+        request_json = {"state": state}
+
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1/state",
+            json=request_json,
+            environ_overrides={"REMOTE_USER": "test"},
+        )
+
+        assert response.status_code == 200
+        assert response.json == {
+            'conf': {},
+            'dag_id': 'TEST_DAG_ID',
+            'dag_run_id': 'TEST_DAG_RUN_ID_1',
+            'end_date': self.default_time,
+            'execution_date': self.default_time,
+            'external_trigger': True,
+            'logical_date': self.default_time,
+            'start_date': self.default_time,
+            'state': state,
+        }
+
+    def test_should_response_400_for_non_existing_dag_run_state(self):
+        test_time = timezone.parse(self.default_time)
+        with create_session() as session, freeze_time(self.default_time):
+            dag_id = "TEST_DAG_ID"
+            dag_run_id = "TEST_DAG_RUN_ID_1"
+            dag = DagModel(dag_id=dag_id)
+            dag_run = DagRun(
+                dag_id=dag_id,
+                run_id=dag_run_id,
+                state=DagRunState.RUNNING,
+                run_type=DagRunType.MANUAL,
+                execution_date=test_time,
+                start_date=test_time,
+                external_trigger=True,
+            )
+            session.add(dag)
+            session.add(dag_run)
+
+            request_json = {"state": "madeUpState"}

Review comment:
       We should parametrize this and make sure that DagRuns are not set to `queued` or `running`

##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -604,6 +604,41 @@ paths:
         '404':
           $ref: '#/components/responses/NotFound'
 
+  /dags/{dag_id}/dagRuns/{dag_run_id}/state:
+    parameters:
+      - $ref: '#/components/parameters/DAGID'
+      - $ref: '#/components/parameters/DAGRunID'
+
+    post:
+      summary: Set a state of DAG run
+      description: >
+        Set a state of DAG run
+      x-openapi-router-controller: airflow.api_connexion.endpoints.dag_run_endpoint
+      operationId: post_set_dag_run_state

Review comment:
       ```suggestion
         operationId: update_dag_run_state
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org