You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/22 09:35:00 UTC

[GitHub] [airflow] OmairK opened a new pull request #9473: [WIP] Dag Runs CRUD endpoints

OmairK opened a new pull request #9473:
URL: https://github.com/apache/airflow/pull/9473


   Closes #9110 
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444682662



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -70,10 +92,14 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
 
     # filter execution date
     if execution_date_gte:
-        query = query.filter(DagRun.execution_date >= conn_parse_datetime(execution_date_gte))
+        query = query.filter(
+            DagRun.execution_date >= conn_parse_datetime(execution_date_gte)

Review comment:
       It looks to me, conflicts was not resolved well on this endpoint. Can you check the one on master. It uses format_parameters to parse date instead of conn_parse_datetime




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on pull request #9473: Add Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
OmairK commented on pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#issuecomment-654487056


   > Hiello.
   > We have updated FAB to v3, so we use marshmallow v3. Can you rebase on the latest master?
   > Thanks!
   
   Done :smile: 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444312038



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -76,10 +78,18 @@ def _create_test_dag_run(self, state='running', extra_dag=False):
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+    @provide_session
+    def test_should_response_200(self, session):
+        session.add_all(self._create_test_dag_run())
+        session.commit()
+        response = self.client.delete("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
         assert response.status_code == 204
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
+        self.assertEqual(response.status_code, 404)

Review comment:
       Fixed f0f9256




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r443553805



##########
File path: airflow/api_connexion/openapi/v1.yaml
##########
@@ -276,6 +276,34 @@ paths:
                   - $ref: '#/components/schemas/CollectionInfo'
         '401':
           $ref: '#/components/responses/Unauthenticated'
+    
+    post:
+      summary: Trigger a DAG Run
+      operationId: airflow.api_connexion.endpoints.dag_run_endpoint.post_dag_run
+      tags: [DAGRun]
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DAGRun'
+      responses:
+        '200':
+          description: Successful response.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/DAGRun'
+        '400':
+          $ref: '#/components/responses/BadRequest'
+        '401':
+          $ref: '#/components/responses/Unauthenticated'
+        '409':
+          $ref: '#/components/responses/AlreadyExists'
+        '403':
+          $ref: '#/components/responses/PermissionDenied'
+        '404':
+          $ref: '#/components/responses/NotFound'

Review comment:
       I moved the `post request` from endpoint `/dags/{dag_id}/dagRuns/{dag_run_id}` to `/dags/{dag_id}/dagRuns` as having dag_run_id as a url parameter as well as in request.body seemed redundant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] zikun commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
zikun commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r443626814



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -76,10 +78,18 @@ def _create_test_dag_run(self, state='running', extra_dag=False):
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+    @provide_session
+    def test_should_response_200(self, session):
+        session.add_all(self._create_test_dag_run())
+        session.commit()
+        response = self.client.delete("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
         assert response.status_code == 204
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
+        self.assertEqual(response.status_code, 404)
+
+    def test_should_response_404(self):
+        response = self.client.delete("api/v1/dags/INVALID_DAG_RUN/dagRuns/INVALID_DAG_RUN")
+        self.assertEqual(response.status_code, 404)

Review comment:
       ```suggestion
           assert response.status_code == 404
   ```

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -14,23 +14,28 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from flask import request
+from connexion import NoContent
+from sqlalchemy import and_, func 
 
-from sqlalchemy import func
-
-from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.exceptions import NotFound, AlreadyExists
 from airflow.api_connexion.schemas.dag_run_schema import (
     DAGRunCollection, dagrun_collection_schema, dagrun_schema,
 )
 from airflow.api_connexion.utils import conn_parse_datetime
-from airflow.models import DagRun
+from airflow.models import DagRun, DagModel
 from airflow.utils.session import provide_session
+from airflow.utils.types import DagRunType
 
 
-def delete_dag_run():
+@provide_session
+def delete_dag_run(dag_id, dag_run_id, session):
     """
     Delete a DAG Run
     """
-    raise NotImplementedError("Not implemented yet.")
+    if session.query(DagRun).filter(and_(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)).delete() == 0:
+        raise NotFound("DAGRun not found")

Review comment:
       Maybe to be more precise and to be consistent with line 127:
   ```suggestion
           raise NotFound(f"DAGRun with DAG ID:{dag_id} and DAGRun ID:{dag_run_id} not found")
   ```

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -354,7 +364,17 @@ def test_should_response_200(self):
 
 
 class TestPostDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.post("/dags/TEST_DAG_ID/dagRuns")
+    @provide_session
+    def test_should_response_200(self, session):
+        dag_instance = DagModel(dag_id="TEST_DAG_ID")
+        session.add(dag_instance)
+        session.commit()
+        response = self.client.post("api/v1/dags/TEST_DAG_ID/dagRuns", json={"dag_run_id": "TEST_DAG_RUN", "execution_date": self.default_time, "state": "failed"})
         assert response.status_code == 200
+    
+    def test_response_404(self):
+        response = self.client.post("api/v1/dags/TEST_DAG_ID/dagRuns", json={"dag_run_id": "TEST_DAG_RUN", "execution_date": self.default_time, "state": "failed"})
+        assert response.status_code == 404
+        self.assertEqual(
+            {'detail': None, 'status': 404, 'title': 'DAG with dag_id: TEST_DAG_ID not found', 'type': 'about:blank'}, response.json
+        )

Review comment:
       ```suggestion
           assert response.json == {
               'detail': None,
               'status': 404,
               'title': 'DAG with dag_id: TEST_DAG_ID not found',
               'type': 'about:blank',
           }
   ```

##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -76,10 +78,18 @@ def _create_test_dag_run(self, state='running', extra_dag=False):
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+    @provide_session
+    def test_should_response_200(self, session):
+        session.add_all(self._create_test_dag_run())
+        session.commit()
+        response = self.client.delete("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
         assert response.status_code == 204
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
+        self.assertEqual(response.status_code, 404)

Review comment:
       ```suggestion
                   assert response.status_code == 404
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9473: Add Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#issuecomment-654406329


   Hiello.
   We have updated FAB to v3, so we use marshmallow v3. Can you rebase on the latest master?
   Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj merged pull request #9473: Add Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #9473:
URL: https://github.com/apache/airflow/pull/9473


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444303716



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -354,7 +364,17 @@ def test_should_response_200(self):
 
 
 class TestPostDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.post("/dags/TEST_DAG_ID/dagRuns")
+    @provide_session
+    def test_should_response_200(self, session):
+        dag_instance = DagModel(dag_id="TEST_DAG_ID")
+        session.add(dag_instance)
+        session.commit()
+        response = self.client.post("api/v1/dags/TEST_DAG_ID/dagRuns", json={"dag_run_id": "TEST_DAG_RUN", "execution_date": self.default_time, "state": "failed"})
         assert response.status_code == 200
+    
+    def test_response_404(self):
+        response = self.client.post("api/v1/dags/TEST_DAG_ID/dagRuns", json={"dag_run_id": "TEST_DAG_RUN", "execution_date": self.default_time, "state": "failed"})
+        assert response.status_code == 404
+        self.assertEqual(
+            {'detail': None, 'status': 404, 'title': 'DAG with dag_id: TEST_DAG_ID not found', 'type': 'about:blank'}, response.json
+        )

Review comment:
       Fixed thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r447425987



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -70,10 +92,14 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
 
     # filter execution date
     if execution_date_gte:
-        query = query.filter(DagRun.execution_date >= conn_parse_datetime(execution_date_gte))
+        query = query.filter(
+            DagRun.execution_date >= conn_parse_datetime(execution_date_gte)

Review comment:
       Can you do a rebase? I would like to slowly finish this change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444311320



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -76,10 +78,18 @@ def _create_test_dag_run(self, state='running', extra_dag=False):
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
-        response = self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+    @provide_session
+    def test_should_response_200(self, session):
+        session.add_all(self._create_test_dag_run())
+        session.commit()
+        response = self.client.delete("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
         assert response.status_code == 204
+        response = self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
+        self.assertEqual(response.status_code, 404)
+
+    def test_should_response_404(self):
+        response = self.client.delete("api/v1/dags/INVALID_DAG_RUN/dagRuns/INVALID_DAG_RUN")
+        self.assertEqual(response.status_code, 404)

Review comment:
       Fixed thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444686422



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -70,10 +92,14 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
 
     # filter execution date
     if execution_date_gte:
-        query = query.filter(DagRun.execution_date >= conn_parse_datetime(execution_date_gte))
+        query = query.filter(
+            DagRun.execution_date >= conn_parse_datetime(execution_date_gte)

Review comment:
       I haven't rebased it yet, this will be fixed once I do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9473: Add Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#issuecomment-654487881


   🤞 
   I am waiting for CI results.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444687934



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -70,10 +92,14 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None,
 
     # filter execution date
     if execution_date_gte:
-        query = query.filter(DagRun.execution_date >= conn_parse_datetime(execution_date_gte))
+        query = query.filter(
+            DagRun.execution_date >= conn_parse_datetime(execution_date_gte)

Review comment:
       Oh, that's nice. Brilliant idea. Haha. I use to rebase everytime and having issues. Will start using this method. Thanks @Omairk :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444302962



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -14,23 +14,28 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from flask import request
+from connexion import NoContent
+from sqlalchemy import and_, func 
 
-from sqlalchemy import func
-
-from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.exceptions import NotFound, AlreadyExists
 from airflow.api_connexion.schemas.dag_run_schema import (
     DAGRunCollection, dagrun_collection_schema, dagrun_schema,
 )
 from airflow.api_connexion.utils import conn_parse_datetime
-from airflow.models import DagRun
+from airflow.models import DagRun, DagModel
 from airflow.utils.session import provide_session
+from airflow.utils.types import DagRunType
 
 
-def delete_dag_run():
+@provide_session
+def delete_dag_run(dag_id, dag_run_id, session):
     """
     Delete a DAG Run
     """
-    raise NotImplementedError("Not implemented yet.")
+    if session.query(DagRun).filter(and_(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)).delete() == 0:
+        raise NotFound("DAGRun not found")

Review comment:
       Thanks fixed `5e70d02 `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org