You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/01/07 21:47:02 UTC

[GitHub] [airflow] SamWheating opened a new pull request #20758: Avoid unintentional data loss when deleting DAGs

SamWheating opened a new pull request #20758:
URL: https://github.com/apache/airflow/pull/20758


   We encountered some data loss today due to a user deleting a DAG from the UI called `project.load`, which then deleted all of the history from other DAGs called `project.load.bigquery` and `project.load.trino`, which also caused them to run unexpectedly due to the resetting of run history.
   
   Note - we don't use SubDAGs, we're just using `.` in the DAG ID as a separator for a hierarchical naming system.
   
   As it turns out, deleting a DAG `my_dag` will delete all of the metadata for any DAG which starts with `my_dag.`, as it is assumed that the latter are subdags of the former:
   
   ```python
               cond = or_(model.dag_id == dag_id, model.dag_id.like(dag_id + ".%"))
               count += session.query(model).filter(cond).delete(synchronize_session='fetch')
   ```
   
   This isn't always the case. 
   
   Anyways, this PR changes the delete_dag function so that it explicitly checks that it only delete the intended DAG and DAGs starting with `<dag_id>.` _which are also SubDAGs_. I think that there may still be some other edge cases where DAGs can be unintentionally deleted, but this patches the most apparent case. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780900806



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags

Review comment:
       ```suggestion
       # deleting a DAG should also delete all of its subdags
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780838423



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       Yup, this totally works, I've updated my changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780838423



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       Just tested and, this totally works, I've updated my changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780624363



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       ```suggestion
       dags_to_delete_query = session.query(DagModel.dag_id).filter(
           or_(
               DagModel.dag_id == dag_id,
               and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
           )
       )
       dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
   ```
   
   Would this be better?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#issuecomment-1008559497


   > We encountered some data loss today due to a user deleting a DAG from the UI called `project.load`, which then deleted all of the history from other DAGs called `project.load.bigquery` and `project.load.trino`, which also caused them to run unexpectedly due to the resetting of run history.
   
   Side-comment: I was almost sure that DAG_id cannot contain "." (precisely because of the subdag convention), But now I see this i not the case :). We excluded '.' for task group (for the reason task groups are also . separated) but not for the task ids:
   
   ```
   KEY_REGEX = re.compile(r'^[\w.-]+$')
   GROUP_KEY_REGEX = re.compile(r'^[\w-]+$')
   ```
   
   Nice catch.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r781813386



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also delete all of its subdags
+    dags_to_delete_query = session.query(DagModel.dag_id).filter(
+        or_(
+            DagModel.dag_id == dag_id,
+            and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
+        )
+    )
+    dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]

Review comment:
       ```
           or_(
               DagModel.dag_id == dag_id,,
               DagModel.root_dag_id == dag_id
   ```
   
   might have also worked @SamWheating 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780836809



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       Yeah, I think so - would `dags_to_delete` be a list of string here, or a list of one-element tuples? I always get mixed up with the return types from SQLAlchemy queries 🤔  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r781813386



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also delete all of its subdags
+    dags_to_delete_query = session.query(DagModel.dag_id).filter(
+        or_(
+            DagModel.dag_id == dag_id,
+            and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
+        )
+    )
+    dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]

Review comment:
       ```python
           or_(
               DagModel.dag_id == dag_id,,
               DagModel.root_dag_id == dag_id
   ```
   
   might have also worked @SamWheating 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780861576



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       `list(dags_to_delete_query)` would have been a list of one-tuples, but the extra comma after `dag_id` unpacks the tuple and turn `dags_to_delete` into a list of strings.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#issuecomment-1008482384


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #20758:
URL: https://github.com/apache/airflow/pull/20758


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780624363



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       ```suggestion
       dags_to_delete_query = session.query(DagModel.dag_id).filter(
           or_(
               and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
               DagModel.dag_id == dag_id
           )
       )
       dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
   ```
   
   Would this be better?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #20758: Avoid unintentional data loss when deleting DAGs

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #20758:
URL: https://github.com/apache/airflow/pull/20758#discussion_r780624363



##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       ```suggestion
       dags_to_delete_query = session.query(DagModel.dag_id).filter(
           or_(
               and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
               DagModel.dag_id == dag_id
           )
       )
       dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
   ```
   
   Would this be better?

##########
File path: airflow/api/common/delete_dag.py
##########
@@ -54,6 +54,15 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
     if dag is None:
         raise DagNotFound(f"Dag id {dag_id} not found")
 
+    # deleting a DAG should also all of its subdags
+    dags_to_delete = [
+        d[0]
+        for d in session.query(DagModel.dag_id)
+        .filter(and_(DagModel.dag_id.like(dag_id + ".%"), DagModel.is_subdag))
+        .all()
+    ]
+    dags_to_delete.append(dag_id)

Review comment:
       ```suggestion
       dags_to_delete_query = session.query(DagModel.dag_id).filter(
           or_(
               DagModel.dag_id == dag_id,
               and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
           )
       )
       dags_to_delete = [dag_id for dag_id, in dags_to_delete_query]
   ```
   
   Would this be better?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org