You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "jedcunningham (via GitHub)" <gi...@apache.org> on 2023/07/05 15:55:17 UTC

[GitHub] [airflow] jedcunningham commented on a diff in pull request #32235: Refactor SetupTeardownContext Manager

jedcunningham commented on code in PR #32235:
URL: https://github.com/apache/airflow/pull/32235#discussion_r1253173736


##########
airflow/utils/setup_teardown.py:
##########
@@ -63,29 +65,37 @@ def pop_context_managed_setup_task(cls) -> Operator | list[Operator] | None:
                 else:
                     setup_task.set_downstream(old_setup_task)
         else:
-            cls._context_managed_setup_task = None
+            cls._context_managed_setup_task = []
         return old_setup_task
 
     @classmethod
-    def update_context_map(cls, operator):
-        ctx = BaseSetupTeardownContext.context_map
-        if setup_task := BaseSetupTeardownContext.get_context_managed_setup_task():
-            if isinstance(setup_task, list):
-                setup_task = tuple(setup_task)
-            if ctx.get(setup_task) is None:
-                ctx[setup_task] = [operator]
+    def update_context_map(cls, task: DependencyMixin):
+        from airflow.models.abstractoperator import AbstractOperator
+
+        task_ = cast(AbstractOperator, task)
+        if task_.is_setup or task_.is_teardown:
+            return
+        ctx = cls.context_map
+
+        def _get_or_set_item(item):
+            if ctx.get(item) is None:
+                ctx[item] = [task_]
             else:
-                ctx[setup_task].append(operator)
-        if teardown_task := BaseSetupTeardownContext.get_context_managed_teardown_task():
+                ctx[item].append(task_)
+
+        if setup_task := cls.get_context_managed_setup_task():
+            if isinstance(setup_task, list):
+                _get_or_set_item(tuple(setup_task))
+            elif not isinstance(setup_task, list):
+                _get_or_set_item(setup_task)
+        if teardown_task := cls.get_context_managed_teardown_task():
             if isinstance(teardown_task, list):
-                teardown_task = tuple(teardown_task)
-            if ctx.get(teardown_task) is None:
-                ctx[teardown_task] = [operator]
-            else:
-                ctx[teardown_task].append(operator)
+                _get_or_set_item(tuple(teardown_task))
+            elif not isinstance(teardown_task, list):

Review Comment:
   Here too?



##########
airflow/utils/setup_teardown.py:
##########
@@ -63,29 +65,37 @@ def pop_context_managed_setup_task(cls) -> Operator | list[Operator] | None:
                 else:
                     setup_task.set_downstream(old_setup_task)
         else:
-            cls._context_managed_setup_task = None
+            cls._context_managed_setup_task = []
         return old_setup_task
 
     @classmethod
-    def update_context_map(cls, operator):
-        ctx = BaseSetupTeardownContext.context_map
-        if setup_task := BaseSetupTeardownContext.get_context_managed_setup_task():
-            if isinstance(setup_task, list):
-                setup_task = tuple(setup_task)
-            if ctx.get(setup_task) is None:
-                ctx[setup_task] = [operator]
+    def update_context_map(cls, task: DependencyMixin):
+        from airflow.models.abstractoperator import AbstractOperator
+
+        task_ = cast(AbstractOperator, task)
+        if task_.is_setup or task_.is_teardown:
+            return
+        ctx = cls.context_map
+
+        def _get_or_set_item(item):
+            if ctx.get(item) is None:
+                ctx[item] = [task_]
             else:
-                ctx[setup_task].append(operator)
-        if teardown_task := BaseSetupTeardownContext.get_context_managed_teardown_task():
+                ctx[item].append(task_)
+
+        if setup_task := cls.get_context_managed_setup_task():
+            if isinstance(setup_task, list):
+                _get_or_set_item(tuple(setup_task))
+            elif not isinstance(setup_task, list):

Review Comment:
   Should this just be an else?



##########
airflow/utils/setup_teardown.py:
##########
@@ -97,70 +107,64 @@ def pop_context_managed_teardown_task(cls) -> Operator | list[Operator] | None:
                 else:
                     teardown_task.set_upstream(old_teardown_task)
         else:
-            cls._context_managed_teardown_task = None
+            cls._context_managed_teardown_task = []
         return old_teardown_task
 
     @classmethod
-    def get_context_managed_setup_task(cls) -> Operator | list[Operator] | None:
+    def get_context_managed_setup_task(cls) -> AbstractOperator | list[AbstractOperator]:
         return cls._context_managed_setup_task
 
     @classmethod
-    def get_context_managed_teardown_task(cls) -> Operator | list[Operator] | None:
+    def get_context_managed_teardown_task(cls) -> AbstractOperator | list[AbstractOperator]:
         return cls._context_managed_teardown_task
 
     @classmethod
-    def push_setup_teardown_task(cls, operator: Operator | list[Operator]):
+    def push_setup_teardown_task(cls, operator: AbstractOperator | list[AbstractOperator]):
         if isinstance(operator, list):
-            first_task: Operator = operator[0]
-            if first_task.is_teardown:
-                if not all(task.is_teardown == first_task.is_teardown for task in operator):
-                    raise ValueError("All tasks in the list must be either setup or teardown tasks")
-                upstream_tasks = first_task.upstream_list
-                for task in upstream_tasks:
-                    if not task.is_setup and not task.is_teardown:
-                        raise ValueError(
-                            "All upstream tasks in the context manager must be a setup or teardown task"
-                        )
-                BaseSetupTeardownContext.push_context_managed_teardown_task(operator)
-                upstream_setup: list[Operator] = [task for task in upstream_tasks if task.is_setup]
-                if upstream_setup:
-                    BaseSetupTeardownContext.push_context_managed_setup_task(upstream_setup)
-            elif first_task.is_setup:
-                if not all(task.is_setup == first_task.is_setup for task in operator):
-                    raise ValueError("All tasks in the list must be either setup or teardown tasks")
-                for task in first_task.upstream_list:
-                    if not task.is_setup and not task.is_teardown:
-                        raise ValueError(
-                            "All upstream tasks in the context manager must be a setup or teardown task"
-                        )
-                BaseSetupTeardownContext.push_context_managed_setup_task(operator)
-                downstream_teardown: list[Operator] = [
-                    task for task in first_task.downstream_list if task.is_teardown
-                ]
-                if downstream_teardown:
-                    BaseSetupTeardownContext.push_context_managed_teardown_task(downstream_teardown)
+            if operator[0].is_teardown:
+                cls._push_tasks(operator)
+            elif operator[0].is_setup:
+                cls._push_tasks(operator, setup=True)
         elif operator.is_teardown:
-            upstream_tasks = operator.upstream_list
-            for task in upstream_tasks:
-                if not task.is_setup and not task.is_teardown:
-                    raise ValueError(
-                        "All upstream tasks in the context manager must be a setup or teardown task"
-                    )
-            BaseSetupTeardownContext.push_context_managed_teardown_task(operator)
-            upstream_setup = [task for task in upstream_tasks if task.is_setup]
-            if upstream_setup:
-                BaseSetupTeardownContext.push_context_managed_setup_task(upstream_setup)
+            cls._push_tasks(operator)
         elif operator.is_setup:
-            for task in operator.upstream_list:
-                if not task.is_setup and not task.is_teardown:
-                    raise ValueError(
-                        "All upstream tasks in the context manager must be a setup or teardown task"
+            cls._push_tasks(operator, setup=True)
+        cls.active = True
+
+    @classmethod
+    def _push_tasks(cls, operator: AbstractOperator | list[AbstractOperator], setup: bool = False):
+        if isinstance(operator, list):
+            upstream_tasks = operator[0].upstream_list
+            downstream_list = operator[0].downstream_list
+            if not all(task.is_setup == operator[0].is_setup for task in operator):
+                cls.error("All tasks in the list must be either setup or teardown tasks")

Review Comment:
   We aren't checking the teardown side any longer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org