You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/29 18:41:58 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #19683: In DAG dependency detector, use class type instead of class name, related #19582

dstandish commented on a change in pull request #19683:
URL: https://github.com/apache/airflow/pull/19683#discussion_r758624943



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
             'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
         ]
 
+    def test_derived_dag_deps(self):
+        """
+        Tests DAG dependency detection, including derived classes
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        class DerivedSensor(ExternalTaskSensor):
+            template_fields = ['external_dag_id', 'run_id']
+
+            def __init__(self, *, external_dag_id: str, run_id: str = None, **kwargs):
+                super().__init__(
+                    external_dag_id=external_dag_id, execution_delta=None, execution_date_fn=None, **kwargs
+                )
+                self.run_id = run_id
+
+        execution_date = datetime(2020, 1, 1)
+        for Sensor in [ExternalTaskSensor, DerivedSensor]:
+            with DAG(dag_id="test_derived_dag_deps_sensor", start_date=execution_date) as dag:
+                task1 = Sensor(
+                    task_id="task1",
+                    external_dag_id="external_dag_id",
+                    mode="reschedule",
+                )
+                task2 = DummyOperator(task_id="task2")
+                task1 >> task2
+
+            dag = SerializedDAG.to_dict(dag)
+            assert dag['dag']['dag_dependencies'] == [
+                {
+                    'source': 'external_dag_id',
+                    'target': 'test_derived_dag_deps_sensor',
+                    'dependency_type': 'sensor',
+                    'dependency_id': 'task1',
+                }
+            ]
+
+        class DerivedTrigger(TriggerDagRunOperator):

Review comment:
       please call this `DerivedOperator` instead of `DerivedTrigger` 
   
   `Trigger` is a _thing_ in airflow so best not to use it here (could be confusing, and produce false positives in search)
   
   ```suggestion
           class DerivedOperator(TriggerDagRunOperator):
   ```

##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
             'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
         ]
 
+    def test_derived_dag_deps(self):
+        """
+        Tests DAG dependency detection, including derived classes
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        class DerivedSensor(ExternalTaskSensor):
+            template_fields = ['external_dag_id', 'run_id']
+
+            def __init__(self, *, external_dag_id: str, run_id: str = None, **kwargs):
+                super().__init__(
+                    external_dag_id=external_dag_id, execution_delta=None, execution_date_fn=None, **kwargs
+                )
+                self.run_id = run_id
+
+        execution_date = datetime(2020, 1, 1)
+        for Sensor in [ExternalTaskSensor, DerivedSensor]:

Review comment:
       ```suggestion
           for class_ in [ExternalTaskSensor, DerivedSensor]:
   ```

##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
             'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
         ]
 
+    def test_derived_dag_deps(self):
+        """
+        Tests DAG dependency detection, including derived classes
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        class DerivedSensor(ExternalTaskSensor):
+            template_fields = ['external_dag_id', 'run_id']
+
+            def __init__(self, *, external_dag_id: str, run_id: str = None, **kwargs):
+                super().__init__(
+                    external_dag_id=external_dag_id, execution_delta=None, execution_date_fn=None, **kwargs
+                )
+                self.run_id = run_id
+
+        execution_date = datetime(2020, 1, 1)
+        for Sensor in [ExternalTaskSensor, DerivedSensor]:
+            with DAG(dag_id="test_derived_dag_deps_sensor", start_date=execution_date) as dag:
+                task1 = Sensor(
+                    task_id="task1",
+                    external_dag_id="external_dag_id",
+                    mode="reschedule",
+                )
+                task2 = DummyOperator(task_id="task2")
+                task1 >> task2
+
+            dag = SerializedDAG.to_dict(dag)
+            assert dag['dag']['dag_dependencies'] == [
+                {
+                    'source': 'external_dag_id',
+                    'target': 'test_derived_dag_deps_sensor',
+                    'dependency_type': 'sensor',
+                    'dependency_id': 'task1',
+                }
+            ]
+
+        class DerivedTrigger(TriggerDagRunOperator):
+            def __init__(self, **kwargs):
+                super().__init__(**kwargs)
+
+        for Trigger in [TriggerDagRunOperator, DerivedTrigger]:

Review comment:
       ```suggestion
           for class_ in [TriggerDagRunOperator, DerivedTrigger]:
   ```

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -444,14 +446,14 @@ class DependencyDetector:
     @staticmethod
     def detect_task_dependencies(task: BaseOperator) -> Optional['DagDependency']:
         """Detects dependencies caused by tasks"""
-        if task.task_type == "TriggerDagRunOperator":
+        if isinstance(task, TriggerDagRunOperator):

Review comment:
       @kaxil do you think that perhaps there should be an attribute on task sensor (like a class attribute that is set on it, and which could also be set in other implementations) that we look at (instead of the class itself) so that there could be other implementations of task-sensor-like operators which do not inherit from external task sensor but also have dag dependencies?
   
   so that it needn't necessarily be a subclass.  maybe it's a bit too magical, and just thinking aloud, but perhaps we could look at the arguments of the operator to infer whether DagDependency should be created? 

##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
             'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
         ]
 
+    def test_derived_dag_deps(self):
+        """
+        Tests DAG dependency detection, including derived classes
+        """
+        from airflow.operators.dummy import DummyOperator
+        from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        class DerivedSensor(ExternalTaskSensor):
+            template_fields = ['external_dag_id', 'run_id']
+
+            def __init__(self, *, external_dag_id: str, run_id: str = None, **kwargs):
+                super().__init__(
+                    external_dag_id=external_dag_id, execution_delta=None, execution_date_fn=None, **kwargs
+                )
+                self.run_id = run_id
+
+        execution_date = datetime(2020, 1, 1)
+        for Sensor in [ExternalTaskSensor, DerivedSensor]:
+            with DAG(dag_id="test_derived_dag_deps_sensor", start_date=execution_date) as dag:
+                task1 = Sensor(
+                    task_id="task1",
+                    external_dag_id="external_dag_id",
+                    mode="reschedule",
+                )
+                task2 = DummyOperator(task_id="task2")
+                task1 >> task2
+
+            dag = SerializedDAG.to_dict(dag)
+            assert dag['dag']['dag_dependencies'] == [
+                {
+                    'source': 'external_dag_id',
+                    'target': 'test_derived_dag_deps_sensor',
+                    'dependency_type': 'sensor',
+                    'dependency_id': 'task1',
+                }
+            ]
+
+        class DerivedTrigger(TriggerDagRunOperator):
+            def __init__(self, **kwargs):
+                super().__init__(**kwargs)

Review comment:
       you can omit  `__init__` when you do not change it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org