You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/03 13:09:45 UTC

[GitHub] [airflow] vikramcse opened a new pull request #15174: raise AlreadyExists exception when the execution_date is same

vikramcse opened a new pull request #15174:
URL: https://github.com/apache/airflow/pull/15174


   This issue occurs when the execution_date is same as previous dag run, raise a AlreadyExists exception with 409 status code instead of 500 error
   FIXES: #15150


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606672209



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,6 +240,18 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
+    dagrun_with_execution_date = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id,
+                                     DagRun.execution_date == post_body["execution_date"]).first()
+    )
+
+    if dagrun_with_execution_date:
+        raise AlreadyExists(
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ExecutionDate: '{post_body['execution_date']}' already "
+                   f"exists"
+        )
+

Review comment:
       Can you add execution_date to the filter in line 256 instead of another query?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #15174:
URL: https://github.com/apache/airflow/pull/15174


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vikramcse commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
vikramcse commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606760000



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,6 +240,18 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
+    dagrun_with_execution_date = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id,
+                                     DagRun.execution_date == post_body["execution_date"]).first()
+    )
+
+    if dagrun_with_execution_date:
+        raise AlreadyExists(
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ExecutionDate: '{post_body['execution_date']}' already "
+                   f"exists"
+        )
+

Review comment:
       Thanks @ephraimbuddy 
   I have added these changes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vikramcse commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
vikramcse commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606796068



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,14 +241,26 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
     dagrun_instance = (
-        session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"]).first()
+        session.query(DagRun)
+        .filter(
+            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"])
+        )

Review comment:
       @kaxil apologies for the confusion
   I missed the the `dag_id` filter mistakenly. I have added the condition again into the query.
   Thanks you very much for suggesting the changes 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#issuecomment-812868176


   [The Workflow run](https://github.com/apache/airflow/actions/runs/714413466) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#issuecomment-814075737


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] vikramcse commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
vikramcse commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606673340



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,6 +240,18 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
+    dagrun_with_execution_date = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id,
+                                     DagRun.execution_date == post_body["execution_date"]).first()
+    )
+
+    if dagrun_with_execution_date:
+        raise AlreadyExists(
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ExecutionDate: '{post_body['execution_date']}' already "
+                   f"exists"
+        )
+

Review comment:
       Originally I have decided to go with that logic, but there are two cases which can happen
   1. the DagRun.run_id is same and DagRun.execution_date is same
   2. the DagRun.run_id is different and DagRun.execution_date is same
   
   in both the cases it should throw the execution_date error as there are two unique constraints
   
   UniqueConstraint('dag_id', 'execution_date')
   UniqueConstraint('dag_id', 'run_id')
   
   If I added the DagRun.execution_date in the same query on line 256 then, we are missing the AlreadyExists error when the DagRun.run_id is different, because of this the `dagrun_instance` will be a None result and the DagRun will get created.
   
   let me know if anything is wrong with my logic
   Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#issuecomment-814064818


   cc @ephraimbuddy since you have a requested change 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606814825



##########
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##########
@@ -1013,6 +1014,27 @@ def test_response_409(self):
             "type": EXCEPTIONS_LINK_MAP[409],
         }
 
+    def test_response_409_when_execution_date_is_same(self):
+        self._create_test_dag_run()
+
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns",
+            json={
+                "dag_run_id": "TEST_DAG_RUN_ID_2",
+                "execution_date": self.default_time,
+            },
+            environ_overrides={'REMOTE_USER': "test"},
+        )
+
+        assert response.status_code == 409, response.data
+        assert response.json == {
+            "detail": "DAGRun with DAG ID: 'TEST_DAG_ID' and "
+            "DAGRun ExecutionDate: '2020-06-11 18:00:00+00:00' already exists",

Review comment:
       ```suggestion
               "detail": "A DagRun with dag_id: 'TEST_DAG_ID' and "
               "execution_date: '2020-06-11 18:00:00+00:00' already exists",
   ```

##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,14 +241,27 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
     dagrun_instance = (
-        session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"]).first()
+        session.query(DagRun)
+        .filter(
+            DagRun.dag_id == dag_id,
+            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"]),
+        )
+        .first()
     )
     if not dagrun_instance:
         dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, **post_body)
         session.add(dag_run)
         session.commit()
         return dagrun_schema.dump(dag_run)
+
+    if dagrun_instance.execution_date == post_body["execution_date"]:
+        raise AlreadyExists(
+            detail=f"DAGRun with DAG ID: '{dag_id}' and "
+            f"DAGRun ExecutionDate: '{post_body['execution_date']}' already exists"

Review comment:
       ```suggestion
               detail=f"A DagRun with dag_id: '{dag_id}' and "
               f"execution_date: '{post_body['execution_date']}' already exists"
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#issuecomment-813513757


   Test is failing on SQLite -- can you double-check please: https://github.com/apache/airflow/pull/15174/checks?check_run_id=2264823977#step:6:2732


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606709237



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,6 +240,18 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
+    dagrun_with_execution_date = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id,
+                                     DagRun.execution_date == post_body["execution_date"]).first()
+    )
+
+    if dagrun_with_execution_date:
+        raise AlreadyExists(
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ExecutionDate: '{post_body['execution_date']}' already "
+                   f"exists"
+        )
+

Review comment:
       Oh, I see, just took a look at the table now. In that case, this will work instead of a separate query:
   ```
   session.query(DagRun).filter(DagRun.dag_id == dag_id).
               filter(or_(DagRun.run_id == post_body["run_id"],
                       DagRun.execution_date == post_body["execution_date"])).first()
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606788179



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,14 +241,26 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
     dagrun_instance = (
-        session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"]).first()
+        session.query(DagRun)
+        .filter(
+            or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"])
+        )

Review comment:
       Why not include `dag_id` as a filter, why scan all DAG IDs
   
   ```suggestion
           .filter(DagRun.dag_id == dag_id,
               or_(DagRun.run_id == post_body["run_id"], DagRun.execution_date == post_body["execution_date"])
           )
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15174: raise AlreadyExists exception when the execution_date is same

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15174:
URL: https://github.com/apache/airflow/pull/15174#discussion_r606788324



##########
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##########
@@ -240,6 +240,18 @@ def post_dag_run(dag_id, session):
         post_body = dagrun_schema.load(request.json, session=session)
     except ValidationError as err:
         raise BadRequest(detail=str(err))
+
+    dagrun_with_execution_date = (
+        session.query(DagRun).filter(DagRun.dag_id == dag_id,
+                                     DagRun.execution_date == post_body["execution_date"]).first()
+    )
+
+    if dagrun_with_execution_date:
+        raise AlreadyExists(
+            detail=f"DAGRun with DAG ID: '{dag_id}' and DAGRun ExecutionDate: '{post_body['execution_date']}' already "
+                   f"exists"
+        )
+

Review comment:
       Agree with @ephraimbuddy , you are missing `.filter(DagRun.dag_id == dag_id)` that Ephraim suggested




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org