You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/01 09:33:43 UTC

[GitHub] [airflow] SamWheating commented on a change in pull request #17891: Show error in UI when a DAG with same dag_id as another DAG is present

SamWheating commented on a change in pull request #17891:
URL: https://github.com/apache/airflow/pull/17891#discussion_r699562107



##########
File path: airflow/models/dagbag.py
##########
@@ -466,6 +467,34 @@ def _bag_dag(self, *, dag, root_dag, recursive):
                         del self.dags[subdag.dag_id]
             raise
 
+    @provide_session
+    def _check_if_duplicate(self, dag, session=None):
+        """
+        Checks if a DAG with the same ID already exists.
+        If present, returns the fileloc of the existing DAG.
+        """
+        from airflow.models.serialized_dag import SerializedDagModel  # Avoid circular import
+
+        other_dag = session.query(SerializedDagModel).filter(
+            SerializedDagModel.dag_id == dag.dag_id,
+            SerializedDagModel.fileloc_hash != DagCode.dag_fileloc_hash(dag.fileloc)
+        ).first()

Review comment:
       In the unlikely event that there are more than two instances of a single DAG name, this is only going to alert on one of the collisions, correct?
   
   Do you think its worthwhile to handle this case and alert for a larger collision, Or would it be too much added complexity for a rare failure?

##########
File path: airflow/models/dagbag.py
##########
@@ -466,6 +467,34 @@ def _bag_dag(self, *, dag, root_dag, recursive):
                         del self.dags[subdag.dag_id]
             raise
 
+    @provide_session
+    def _check_if_duplicate(self, dag, session=None):
+        """
+        Checks if a DAG with the same ID already exists.
+        If present, returns the fileloc of the existing DAG.
+        """
+        from airflow.models.serialized_dag import SerializedDagModel  # Avoid circular import
+
+        other_dag = session.query(SerializedDagModel).filter(
+            SerializedDagModel.dag_id == dag.dag_id,
+            SerializedDagModel.fileloc_hash != DagCode.dag_fileloc_hash(dag.fileloc)
+        ).first()
+        if other_dag:
+            # If a DAG was just moved from one file to another, this condition may sometimes check out.
+            # Heuristics below try to avoid a false-positive alert
+            if not os.path.exists(other_dag.fileloc):
+                # Other file is no more, nothing to worry about
+                return None
+            # Not feasible here to parse that other file to check if it still has the DAG,
+            # so resort to just checking the task ids
+            task_ids = [task.task_id for task in dag.tasks]
+            other_dag_task_ids = [task['label'] for task in other_dag.data['dag']['tasks']]
+            if task_ids != other_dag_task_ids:

Review comment:
       If these two lists are built from the same DAG, will they always be in the same order? If not then the comparison isn't going to be accurate. 
   
   If they aren't in a deterministic order, maybe we could sort or cast to a set before comparison? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org