You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/12/16 10:01:59 UTC

[GitHub] [airflow] ashb commented on a change in pull request #6633: [AIRFLOW-2279] Clear tasks across DAGs if marked by ExternalTaskMarker

ashb commented on a change in pull request #6633: [AIRFLOW-2279] Clear tasks across DAGs if marked by ExternalTaskMarker
URL: https://github.com/apache/airflow/pull/6633#discussion_r358142948
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -161,3 +163,56 @@ def poke(self, context, session=None):
 
         session.commit()
         return count == len(dttm_filter)
+
+
+class ExternalTaskMarker(BaseOperator):
+    """
+    Use this operator to indicate that a task on a different DAG depends on this task.
+    When this task is cleared with "Recursive" selected, Airflow will clear the task on
+    the other DAG and its downstream tasks recursively. Transitive dependencies is followed
+    until the recursion_depth is reached.
+    """
+    template_fields = ['external_dag_id', 'external_task_id', 'execution_date']
+    ui_color = '#19647e'
+
+    @apply_defaults
+    def __init__(self,
+                 external_dag_id,
+                 external_task_id,
+                 execution_date: Optional[Union[str, datetime.datetime]] = "{{ execution_date.isoformat() }}",
+                 recursion_depth: int = 10,
+                 *args,
+                 **kwargs):
+        """
+        :param external_dag_id: The dag_id that contains the task you want to wait for
+        :type external_dag_id: str
+        :param external_task_id: The task_id that contains the task you want to wait for.
+        :type external_task_id: str
+        :param execution_date: The execution_date of the task that you want to wait for.
+        :type execution_date: str or datetime.datetime
+        :param recursion_depth: The maximum level of transitive dependencies allowed. Default is 10.
+            This is mostly used for preventing cyclic dependencies. It is fine to increase
+            this number if necessary. However, too many levels of transitive dependencies will make
+            it slower to clear tasks in the web UI.
+        """
+        super().__init__(*args, **kwargs)
+
+        self.external_dag_id = external_dag_id
+        self.external_task_id = external_task_id
+        if isinstance(execution_date, datetime.datetime):
+            self.execution_date = execution_date.isoformat()
+        elif isinstance(execution_date, str):
+            self.execution_date = execution_date
+        else:
+            raise TypeError('Expected str or datetime.datetime type for execution_date. Got {}'
+                            .format(type(execution_date)))
+        if recursion_depth <= 0:
+            raise ValueError("recursion_depth should be a positive integer")
+        self.recursion_depth = recursion_depth
+
+    def execute(self, context):
+        """
+        Since the only purpose of this operator is to indicate a dependency on an external DAG, this
+        method is a no-op.
 
 Review comment:
   This should subclass DummyOperator then (because in future we will likely make the scheduler never bother trying to execute DummyOperator tasks.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services