You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/03 22:36:47 UTC

[GitHub] [airflow] jedcunningham commented on a diff in pull request #25521: Fix dag dependencies detection

jedcunningham commented on code in PR #25521:
URL: https://github.com/apache/airflow/pull/25521#discussion_r937186897


##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as Optional[DagDependency].

Review Comment:
   ```suggestion
           Prior to deprecation of custom dependency detector, the return type was Optional[DagDependency].
   ```



##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as Optional[DagDependency].
+        This class verifies that custom dependency detector classes which assume that return type will still
+        work until support for them is removed in 3.0.
+
+        TODO: remove in Airflow 3.0
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2

Review Comment:
   ```suggestion
   ```
   
   Not sure we need these for this test?



##########
airflow/serialization/serialized_objects.py:
##########
@@ -1048,14 +1058,11 @@ def serialize_dag(cls, dag: DAG) -> dict:
                 del serialized_dag["timetable"]
 
             serialized_dag["tasks"] = [cls._serialize(task) for _, task in dag.task_dict.items()]
-            dag_deps = [
-                t.__dict__
-                for task in dag.task_dict.values()
-                for t in SerializedBaseOperator.detect_dependencies(task)
-                if t is not None
-            ]
-
-            serialized_dag["dag_dependencies"] = dag_deps
+            dag_deps = {
+                d for t in dag.task_dict.values() for d in SerializedBaseOperator.detect_dependencies(t)

Review Comment:
   ```suggestion
                   dep for task in dag.task_dict.values() for dep in SerializedBaseOperator.detect_dependencies(task)
   ```
   
   Might be worth using better variable names here.



##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as Optional[DagDependency].
+        This class verifies that custom dependency detector classes which assume that return type will still
+        work until support for them is removed in 3.0.
+
+        TODO: remove in Airflow 3.0
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2
+            CustomDepOperator(task_id='hello', bash_command='hi')
+            dag = SerializedDAG.to_dict(dag)
+            assert sorted(dag['dag']['dag_dependencies'], key=lambda x: tuple(x.values())) == sorted(
+                [
+                    {
+                        'source': 'external_dag_id',
+                        'target': 'test',
+                        'dependency_type': 'sensor',
+                        'dependency_id': 'task1',
+                    },
+                    {
+                        'source': 'test',
+                        'target': 'nothing',
+                        'dependency_type': 'abc',
+                        'dependency_id': 'hello',
+                    },
+                ],
+                key=lambda x: tuple(x.values()),
+            )
+
+    def test_dag_deps_datasets(self):
+        """
+        Check that dag_dependencies node is populated correctly for a DAG with datasets.
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        d1 = Dataset('d1')
+        d2 = Dataset('d2')
+        d3 = Dataset('d2')

Review Comment:
   Might be worth a different name here, took me a minute to work out the d3.uri _should_ == "d2".



##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as Optional[DagDependency].
+        This class verifies that custom dependency detector classes which assume that return type will still
+        work until support for them is removed in 3.0.
+
+        TODO: remove in Airflow 3.0
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2
+            CustomDepOperator(task_id='hello', bash_command='hi')
+            dag = SerializedDAG.to_dict(dag)
+            assert sorted(dag['dag']['dag_dependencies'], key=lambda x: tuple(x.values())) == sorted(
+                [
+                    {
+                        'source': 'external_dag_id',
+                        'target': 'test',
+                        'dependency_type': 'sensor',
+                        'dependency_id': 'task1',
+                    },
+                    {
+                        'source': 'test',
+                        'target': 'nothing',
+                        'dependency_type': 'abc',
+                        'dependency_id': 'hello',
+                    },
+                ],
+                key=lambda x: tuple(x.values()),
+            )
+
+    def test_dag_deps_datasets(self):
+        """
+        Check that dag_dependencies node is populated correctly for a DAG with datasets.
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        d1 = Dataset('d1')
+        d2 = Dataset('d2')
+        d3 = Dataset('d2')
+        d4 = Dataset('d4')
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date, schedule_on=[d1]) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2

Review Comment:
   ```suggestion
   ```
   
   Same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org