You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/09/25 03:16:38 UTC

[GitHub] [airflow] josh-fell opened a new pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

josh-fell opened a new pull request #17421:
URL: https://github.com/apache/airflow/pull/17421


   Closes: #7858
   
   Adding the ability to configure the `ShortCircuitOperator` to respect trigger rules for downstream tasks.  Currently this operator ignores all trigger rules and forcibly skips all downstream tasks.  However, there are use cases in which downstream tasks from the `ShortCircuitOperator` have trigger rules applied such that said tasks should execute even if upstream tasks are skipped by the operator (e.g. multiple branches that execute in parallel, one branch can be short-circuited at some point, and the branches converge).
   
   This PR adds a new parameter, `mode`, which allows users to have the `ShortCircuitOperator` perform a "hard short" (i.e. blindly skip all downstream tasks; the current behavior) or a "soft short" (i.e. the immediate, downstream task(s) are skipped only and the Scheduler is left to handle the trigger rules appropriately).
   
   The unit tests for the `ShortCircuitOperator` were refactored as part of this change as well.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r690337852



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +185,27 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    perform a "hard" or "soft" short.  In a "hard short", all downstream tasks are skipped without considering
+    the ``trigger_rule`` defined for tasks.  In "soft short", the ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
     condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    downstream tasks are marked with a state of "skipped" based on the short-circuiting mode configured. If
+    the condition is True, downstream tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param do_hard_short: If `True`, all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to `False`, downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type do_hard_short: bool
     """
 
+    def __init__(self, *, do_hard_short: bool = True, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.do_hard_short = do_hard_short

Review comment:
       Excellent, this is where I definitely wanted some feedback; been wavering on naming here. I like `mode`. I'll update. Thanks as always!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r746071223



##########
File path: airflow/operators/python.py
##########
@@ -219,17 +219,31 @@ def execute(self, context: Dict):
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
-    Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
-
-    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
-
-    The condition is determined by the result of `python_callable`.
+    Allows a workflow to continue only if a condition is met. Otherwise, the workflow "short-circuits" and
+    downstream tasks are skipped. The short-circuiting can be configured to either respect or ignore the
+    ``trigger_rule`` set for downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the
+    default setting, all downstream tasks are skipped without considering the ``trigger_rule`` defined for
+    tasks.  However, if this parameter is set to False, the direct, downstream tasks are skipped but the
+    specified ``trigger_rule`` for other subsequent downstream tasks are respected. In this mode,
+    the operator assumes the direct, downstream tasks were purposely meant to be skipped but perhaps
+    not other subsequent tasks.
+
+    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits
+    the workflow if the condition is False. Any downstream tasks are marked with a state of "skipped" based
+    on the short-circuiting mode configured. If the condition is True, downstream tasks proceed as normal.
+
+    The condition is determined by the result of ``python_callable``.
+
+    :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will
+        be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be
+        skipped but the ``trigger_rule`` defined for a other downstream tasks will be respected.
+    :type ignore_downstream_trigger_rules: bool

Review comment:
       Probably worth also to mention something about it in the operator docs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r699568515



##########
File path: airflow/operators/python.py
##########
@@ -203,13 +216,26 @@ def execute(self, context: Dict):
             self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.log.info('Skipping downstream tasks...')
-
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.log.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream tasks %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            dag_run = context["dag_run"]
+            execution_date = context["ti"].execution_date
+
+            if self.mode == "hard":
+                self.log.info("Skipping downstream tasks using a hard short...")
+                self.skip(dag_run, execution_date, downstream_tasks)
+            elif self.mode == "soft":
+                self.log.info("Skipping downstream tasks using a soft short...")
+                # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
+                # Scheduler handle the remaining downstream task(s) appropriately.
+                self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
+            else:

Review comment:
       Would it be preferable to perform this validation on initialization rather than on execution?
   
   In its current state, if an invalid value is provided for `mode`, this task will be submitted to the executor and fail, which could then lead to unintended execution of downstream tasks.
   
   If we were validating this exception and raising in the `__init__` method, it would raise on import which would prevent the broken code from ever reaching the executor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r691295269



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +190,28 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
+
+    The condition is determined by the result of ``python_callable``.
 
-    The condition is determined by the result of `python_callable`.
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to `False`, downstream tasks will be skipped but the ``trigger_rule``

Review comment:
       My fault. That's a silly mistake.  Nice catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r737011293



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       I think I'm OK with `ignore_downstream_trigger_rules` (a bit long but we have autosuggest in editors).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-986055227


   Let's see the tests after all the CI fixes merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r691294438



##########
File path: airflow/operators/python.py
##########
@@ -26,6 +26,11 @@
 from textwrap import dedent
 from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
+try:
+    from typing import Literal
+except ImportError:
+    from typing_extensions import Literal

Review comment:
       Very cool. Thanks for the pro tip!

##########
File path: airflow/operators/python.py
##########
@@ -185,16 +190,28 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
+
+    The condition is determined by the result of ``python_callable``.
 
-    The condition is determined by the result of `python_callable`.
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to `False`, downstream tasks will be skipped but the ``trigger_rule``

Review comment:
       My fault. That's a silly mistake.  Nice catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r746174444



##########
File path: airflow/operators/python.py
##########
@@ -219,17 +219,31 @@ def execute(self, context: Dict):
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
-    Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
-
-    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
-
-    The condition is determined by the result of `python_callable`.
+    Allows a workflow to continue only if a condition is met. Otherwise, the workflow "short-circuits" and
+    downstream tasks are skipped. The short-circuiting can be configured to either respect or ignore the
+    ``trigger_rule`` set for downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the
+    default setting, all downstream tasks are skipped without considering the ``trigger_rule`` defined for
+    tasks.  However, if this parameter is set to False, the direct, downstream tasks are skipped but the
+    specified ``trigger_rule`` for other subsequent downstream tasks are respected. In this mode,
+    the operator assumes the direct, downstream tasks were purposely meant to be skipped but perhaps
+    not other subsequent tasks.
+
+    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits
+    the workflow if the condition is False. Any downstream tasks are marked with a state of "skipped" based
+    on the short-circuiting mode configured. If the condition is True, downstream tasks proceed as normal.
+
+    The condition is determined by the result of ``python_callable``.
+
+    :param ignore_downstream_trigger_rules: If set to True, all downstream tasks from this operator task will
+        be skipped. This is the default behavior. If set to False, the direct, downstream task(s) will be
+        skipped but the ``trigger_rule`` defined for a other downstream tasks will be respected.
+    :type ignore_downstream_trigger_rules: bool

Review comment:
       Oh yes, 100%. I'll add the necessary context to the docs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-986104153


   I think you need a clean-rebase on latest `main` @josh-fell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell closed pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell closed pull request #17421:
URL: https://github.com/apache/airflow/pull/17421


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r699568515



##########
File path: airflow/operators/python.py
##########
@@ -203,13 +216,26 @@ def execute(self, context: Dict):
             self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.log.info('Skipping downstream tasks...')
-
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.log.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream tasks %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            dag_run = context["dag_run"]
+            execution_date = context["ti"].execution_date
+
+            if self.mode == "hard":
+                self.log.info("Skipping downstream tasks using a hard short...")
+                self.skip(dag_run, execution_date, downstream_tasks)
+            elif self.mode == "soft":
+                self.log.info("Skipping downstream tasks using a soft short...")
+                # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
+                # Scheduler handle the remaining downstream task(s) appropriately.
+                self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
+            else:

Review comment:
       Would it be preferable to perform this validation on initialization rather than on execution?
   
   In its current state, if and invalid value is provided for `mode`, this task will be submitted to the executor and fail, which could then lead to unintended execution of downstream tasks.
   
   If we were validating this exception and raising in the `__init__` method, it would raise on import which would prevent the broken code from ever reaching the executor. 

##########
File path: airflow/operators/python.py
##########
@@ -203,13 +216,26 @@ def execute(self, context: Dict):
             self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.log.info('Skipping downstream tasks...')
-
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.log.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream tasks %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            dag_run = context["dag_run"]
+            execution_date = context["ti"].execution_date
+
+            if self.mode == "hard":
+                self.log.info("Skipping downstream tasks using a hard short...")
+                self.skip(dag_run, execution_date, downstream_tasks)
+            elif self.mode == "soft":
+                self.log.info("Skipping downstream tasks using a soft short...")
+                # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
+                # Scheduler handle the remaining downstream task(s) appropriately.
+                self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
+            else:

Review comment:
       Would it be preferable to perform this validation on initialization rather than on execution?
   
   In its current state, if an invalid value is provided for `mode`, this task will be submitted to the executor and fail, which could then lead to unintended execution of downstream tasks.
   
   If we were validating this exception and raising in the `__init__` method, it would raise on import which would prevent the broken code from ever reaching the executor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r728367726



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       Maybe even `ignore_downstream_trigger_rules` and boolean typed? (Not sure what the opinion is for literal vs boolean param values is if there is one.) Makes the param actionable and maybe even easier to understand if True/False controlled. Definitely a case of explicit is better than implicit but the initial critique was the initial `do_hard_short` was a lengthy param name. @uranusjr WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r758728732



##########
File path: docs/apache-airflow/howto/operator/python.rst
##########
@@ -80,7 +80,54 @@ Otherwise you won't have access to the most context variables of Airflow in ``op
 If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and
 ``lazy_object_proxy``.
 
+
+
+.. _howto/operator:ShortCircuitOperator:
+
+ShortCircuitOperator
+========================
+
+Use the :class:`~airflow.operators.python.ShortCircuitOperator` to control whether a pipeline continues only
+if a condition is satisfied. The evaluation of this condition is done via the output of a ``python_callable``.
+If the ``python_callable`` returns True, the condition is considered satisfied and the pipeline is allowed to
+continue. In the example below, the tasks that follow the "condition_is_True" ShortCircuitOperator will
+execute while the tasks downstream of the "condition_is_False" ShortCircuitOperator will be skipped.
+
+
+.. exampleinclude:: /../../airflow/example_dags/example_short_circuit_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_short_circuit]
+    :end-before: [END howto_operator_short_circuit]
+
+
+The "short-circuiting" can be configured to either respect or ignore the ``trigger_rule`` defined
+for downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default configuration, all
+downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks.  If this parameter is
+set to False, the direct, downstream tasks are skipped but the specified ``trigger_rule`` for other subsequent
+downstream tasks are respected. In this mode, the operator assumes the direct, downstream task(s) were
+purposely meant to be skipped but perhaps not other subsequent tasks.
+
+In the example below, notice that the ShortCircuitOperator task is configured to respect downstream trigger
+rules. This means while the tasks that follow the "short_circuit" ShortCircuitOperator task will be skipped
+since the ``python_callable`` returns False, "task_7" will still execute as its set to execute when upstream
+tasks have completed running regardless of status.
+
+.. exampleinclude:: /../../airflow/example_dags/example_short_circuit_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_short_circuit_trigger_rules]
+    :end-before: [END howto_operator_short_circuit_trigger_rules]
+
+
+
+Passing in arguments
+^^^^^^^^^^^^^^^^^^^^
+
+Both the ``op_args`` and ``op_kwargs`` arguments can be used in same way as described for the PythonOperator.
+

Review comment:
       Hmm. It might be useful to keep this context so new users know that they can pass args to the `python_callable` to influence the True/False return value.
   
   The same note is in the `PythonVirtualEnvOperator` too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r690936002



##########
File path: airflow/operators/python.py
##########
@@ -26,6 +26,11 @@
 from textwrap import dedent
 from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
+try:
+    from typing import Literal
+except ImportError:
+    from typing_extensions import Literal

Review comment:
       There’s `airflow.utils.typing_compat` for this.

##########
File path: airflow/operators/python.py
##########
@@ -185,16 +190,28 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
+
+    The condition is determined by the result of ``python_callable``.
 
-    The condition is determined by the result of `python_callable`.
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to `False`, downstream tasks will be skipped but the ``trigger_rule``

Review comment:
       ```suggestion
           the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
   ```

##########
File path: airflow/operators/python.py
##########
@@ -203,13 +220,26 @@ def execute(self, context: Dict):
             self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.log.info('Skipping downstream tasks...')
-
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.log.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream tasks %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            dag_run = context["dag_run"]
+            execution_date = context["ti"].execution_date
+
+            if self.mode == "hard":
+                self.log.info("Skipping downstream tasks with using a hard short...")
+                self.skip(dag_run, execution_date, downstream_tasks)
+            elif self.mode == "soft":
+                self.log.info("Skipping downstream tasks with using a soft short...")
+                # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
+                # Scheduler handle the remaining downstream task(s) appropriately.
+                self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
+            else:
+                raise ValueError(
+                    f"The provided short-circuiting mode value, '{self.mode}', is not supported. "
+                    + "Please use either 'hard' or 'soft'."

Review comment:
       ```suggestion
                       f"The provided short-circuiting mode value, '{self.mode}', is not supported. "
                       f"Please use either 'hard' or 'soft'."
   ```
   
   This is called [string literal concatenation](https://docs.python.org/3/reference/lexical_analysis.html#string-literal-concatenation).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r690936002



##########
File path: airflow/operators/python.py
##########
@@ -26,6 +26,11 @@
 from textwrap import dedent
 from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
+try:
+    from typing import Literal
+except ImportError:
+    from typing_extensions import Literal

Review comment:
       There’s `airflow.utils.typing_compat` for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-911445602


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r758721568



##########
File path: airflow/operators/python.py
##########
@@ -230,17 +230,35 @@ def execute(self, context: Dict):
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
-    Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    Allows a workflow to continue only if a condition is met. Otherwise, the workflow "short-circuits" and
+    downstream tasks are skipped. The short-circuiting can be configured to either respect or ignore the
+    ``trigger_rule`` set for downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the
+    default setting, all downstream tasks are skipped without considering the ``trigger_rule`` defined for
+    tasks.  However, if this parameter is set to False, the direct, downstream tasks are skipped but the
+    specified ``trigger_rule`` for other subsequent downstream tasks are respected. In this mode,
+    the operator assumes the direct, downstream tasks were purposely meant to be skipped but perhaps
+    not other subsequent tasks.
 
-    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits
+    the workflow if the condition is False. Any downstream tasks are marked with a state of "skipped" based
+    on the short-circuiting mode configured. If the condition is True, downstream tasks proceed as normal.

Review comment:
       Yep agreed. I'll update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-982286120


   Merging main to see if the improved tests can make this green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r690936498



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +190,28 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
+
+    The condition is determined by the result of ``python_callable``.
 
-    The condition is determined by the result of `python_callable`.
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to `False`, downstream tasks will be skipped but the ``trigger_rule``

Review comment:
       ```suggestion
           the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r690121805



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +185,27 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    perform a "hard" or "soft" short.  In a "hard short", all downstream tasks are skipped without considering
+    the ``trigger_rule`` defined for tasks.  In "soft short", the ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
     condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    downstream tasks are marked with a state of "skipped" based on the short-circuiting mode configured. If
+    the condition is True, downstream tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param do_hard_short: If `True`, all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to `False`, downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type do_hard_short: bool
     """
 
+    def __init__(self, *, do_hard_short: bool = True, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.do_hard_short = do_hard_short

Review comment:
       Does the argument need to be this long? I feel something like this would be equally readable considering the operator’s name:
   
   ```python
   ShortCircuitOperator(
       task_id="...",
       hard=False,
   )
   ```
   
   although this may be even better:
   
   ```python
   ShortCircuitOperator(
       task_id="...",
       mode="soft",  # typed as Literal["soft", "hard"]
   )
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-892684861


   @eladkal If you have some time, I'd love to get your feedback on the logic in this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r736645499



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       @uranusjr that was the basis for my suggestion of `ignore_downstream_trigger_rules`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-981683414


   > @josh-fell can you take a look at the test failures?
   
   Should be fixed. There were other changes made to `test_python.py` that required the `logging` package which I had removed as part of this PR. Added the import back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r758449147



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       In previous iterations we did have "soft/hard modes" and `do_hard_short` but the question was what do they _mean_? While something like `ignore_downstream_trigger_rules` is long, it is pretty explicit and more understandable than the `modes`/`do_hard_short` names. IMO the same goes with `soft_skip` (or `soft_fail` in the sensors). They are succinct parameter names but what do they _mean_?  Also, there is more consistency (albeit loosely) around `ignore_...` flag nomenclature in Airflow.
   
   Even if users look at the documentation/docstrings anyway to figure out what the parameter does, I still think following the "explicit is better than implicit" principle is the way to go.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell edited a comment on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell edited a comment on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-986163436


   ~Apparently I shouldn't have rebased from the last commit in the branch which was the merge from `main` to this branch; it wiped all of my changes and closed the PR.~ Something went awry with the rebase. Maybe the wrong hash was generated after `merge-base`. Not sure but should have verified. Sad panda.
   
   I have opened a new PR - #20044. Sorry about all this gents.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r762442792



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       Looks mich better now indeed Hard/Soft was extremely difficult to mentally map what it could mean.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk closed pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #17421:
URL: https://github.com/apache/airflow/pull/17421


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r719360368



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       I wonder if it would be better to have:
   ```suggestion
       downstream_trigger_rules_mode = ["respect", "ignore"]
   ```
   At least for me i can understand from the parameter name and option what it means without looking it up in the docstring/documentation. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r734289668



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       Perhaps. Airflow isn't that consistent with this sort of flags right now, but I guess `ignore_` is used more right now? (see e.g. `ignore_task_deps`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r738476033



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       @eladkal If you are OK with the name, I can go ahead and update the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell closed pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell closed pull request #17421:
URL: https://github.com/apache/airflow/pull/17421


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r758703752



##########
File path: airflow/operators/python.py
##########
@@ -230,17 +230,35 @@ def execute(self, context: Dict):
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
-    Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    Allows a workflow to continue only if a condition is met. Otherwise, the workflow "short-circuits" and
+    downstream tasks are skipped. The short-circuiting can be configured to either respect or ignore the
+    ``trigger_rule`` set for downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the
+    default setting, all downstream tasks are skipped without considering the ``trigger_rule`` defined for
+    tasks.  However, if this parameter is set to False, the direct, downstream tasks are skipped but the

Review comment:
       ```suggestion
       tasks. However, if this parameter is set to False, the direct downstream tasks are skipped but the
   ```

##########
File path: airflow/operators/python.py
##########
@@ -230,17 +230,35 @@ def execute(self, context: Dict):
 
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
-    Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    Allows a workflow to continue only if a condition is met. Otherwise, the workflow "short-circuits" and
+    downstream tasks are skipped. The short-circuiting can be configured to either respect or ignore the
+    ``trigger_rule`` set for downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the
+    default setting, all downstream tasks are skipped without considering the ``trigger_rule`` defined for
+    tasks.  However, if this parameter is set to False, the direct, downstream tasks are skipped but the
+    specified ``trigger_rule`` for other subsequent downstream tasks are respected. In this mode,
+    the operator assumes the direct, downstream tasks were purposely meant to be skipped but perhaps
+    not other subsequent tasks.
 
-    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    The ShortCircuitOperator is derived from the PythonOperator. It evaluates a condition and short-circuits
+    the workflow if the condition is False. Any downstream tasks are marked with a state of "skipped" based
+    on the short-circuiting mode configured. If the condition is True, downstream tasks proceed as normal.

Review comment:
       I think the docstring should start with that paragraph (as this explains what the operator does while the prev paragraph explain about specific parameter)
   Also it seems like there is repeated explanation between this paragraph and the previous one.

##########
File path: docs/apache-airflow/howto/operator/python.rst
##########
@@ -80,7 +80,54 @@ Otherwise you won't have access to the most context variables of Airflow in ``op
 If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and
 ``lazy_object_proxy``.
 
+
+
+.. _howto/operator:ShortCircuitOperator:
+
+ShortCircuitOperator
+========================
+
+Use the :class:`~airflow.operators.python.ShortCircuitOperator` to control whether a pipeline continues only
+if a condition is satisfied. The evaluation of this condition is done via the output of a ``python_callable``.
+If the ``python_callable`` returns True, the condition is considered satisfied and the pipeline is allowed to
+continue. In the example below, the tasks that follow the "condition_is_True" ShortCircuitOperator will
+execute while the tasks downstream of the "condition_is_False" ShortCircuitOperator will be skipped.
+
+
+.. exampleinclude:: /../../airflow/example_dags/example_short_circuit_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_short_circuit]
+    :end-before: [END howto_operator_short_circuit]
+
+
+The "short-circuiting" can be configured to either respect or ignore the ``trigger_rule`` defined
+for downstream tasks. If ``ignore_downstream_trigger_rules`` is set to True, the default configuration, all
+downstream tasks are skipped without considering the ``trigger_rule`` defined for tasks.  If this parameter is
+set to False, the direct, downstream tasks are skipped but the specified ``trigger_rule`` for other subsequent
+downstream tasks are respected. In this mode, the operator assumes the direct, downstream task(s) were
+purposely meant to be skipped but perhaps not other subsequent tasks.
+
+In the example below, notice that the ShortCircuitOperator task is configured to respect downstream trigger
+rules. This means while the tasks that follow the "short_circuit" ShortCircuitOperator task will be skipped
+since the ``python_callable`` returns False, "task_7" will still execute as its set to execute when upstream
+tasks have completed running regardless of status.
+
+.. exampleinclude:: /../../airflow/example_dags/example_short_circuit_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_short_circuit_trigger_rules]
+    :end-before: [END howto_operator_short_circuit_trigger_rules]
+
+
+
+Passing in arguments
+^^^^^^^^^^^^^^^^^^^^
+
+Both the ``op_args`` and ``op_kwargs`` arguments can be used in same way as described for the PythonOperator.
+

Review comment:
       I think this is redundant..?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-978902112


   @josh-fell  can you take a look at the test failures?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r733807930



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       The biggest problem I have with `do_hard_short` was it's difficult to internalise (is it `do_soft_short=True` or `do_hard_short=False`?) so I think the same would still apply if we change this to a longer name. `downstream_trigger_mode="xxx"` is long, but not difficult to remember, so it's fine IMO. I'm less sure about the mode names though; from the original issue description, maybe something like `direct` and `all`, or `direct-only` and `recursive` would work better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r691294438



##########
File path: airflow/operators/python.py
##########
@@ -26,6 +26,11 @@
 from textwrap import dedent
 from typing import Any, Callable, Dict, Iterable, List, Optional, Union
 
+try:
+    from typing import Literal
+except ImportError:
+    from typing_extensions import Literal

Review comment:
       Very cool. Thanks for the pro tip!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r690937723



##########
File path: airflow/operators/python.py
##########
@@ -203,13 +220,26 @@ def execute(self, context: Dict):
             self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.log.info('Skipping downstream tasks...')
-
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.log.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream tasks %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            dag_run = context["dag_run"]
+            execution_date = context["ti"].execution_date
+
+            if self.mode == "hard":
+                self.log.info("Skipping downstream tasks with using a hard short...")
+                self.skip(dag_run, execution_date, downstream_tasks)
+            elif self.mode == "soft":
+                self.log.info("Skipping downstream tasks with using a soft short...")
+                # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
+                # Scheduler handle the remaining downstream task(s) appropriately.
+                self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
+            else:
+                raise ValueError(
+                    f"The provided short-circuiting mode value, '{self.mode}', is not supported. "
+                    + "Please use either 'hard' or 'soft'."

Review comment:
       ```suggestion
                       f"The provided short-circuiting mode value, '{self.mode}', is not supported. "
                       f"Please use either 'hard' or 'soft'."
   ```
   
   This is called [string literal concatenation](https://docs.python.org/3/reference/lexical_analysis.html#string-literal-concatenation).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] raphaelauv commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
raphaelauv commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r749249179



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       why not use the same semantics than for the sensors ( soft/hard )->
   
   https://github.com/apache/airflow/pull/8867
   
   soft_skip:bool = False




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-986163436


   Apparently I shouldn't have rebased from the last commit in the branch which was the merge from `main` to this branch; it wiped all of my changes and closed the PR. Sad panda.
   
   I have opened a new PR - #20044. Sorry about all this gents.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r738510897



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       yep




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-914802601


   Rebased and resolved conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r699568515



##########
File path: airflow/operators/python.py
##########
@@ -203,13 +216,26 @@ def execute(self, context: Dict):
             self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.log.info('Skipping downstream tasks...')
-
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.log.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream tasks %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            dag_run = context["dag_run"]
+            execution_date = context["ti"].execution_date
+
+            if self.mode == "hard":
+                self.log.info("Skipping downstream tasks using a hard short...")
+                self.skip(dag_run, execution_date, downstream_tasks)
+            elif self.mode == "soft":
+                self.log.info("Skipping downstream tasks using a soft short...")
+                # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
+                # Scheduler handle the remaining downstream task(s) appropriately.
+                self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
+            else:

Review comment:
       Would it be preferable to perform this validation on initialization rather than on execution?
   
   In its current state, if and invalid value is provided for `mode`, this task will be submitted to the executor and fail, which could then lead to unintended execution of downstream tasks.
   
   If we were validating this exception and raising in the `__init__` method, it would raise on import which would prevent the broken code from ever reaching the executor. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r700243406



##########
File path: airflow/operators/python.py
##########
@@ -203,13 +216,26 @@ def execute(self, context: Dict):
             self.log.info('Proceeding with downstream tasks...')
             return
 
-        self.log.info('Skipping downstream tasks...')
-
         downstream_tasks = context['task'].get_flat_relatives(upstream=False)
-        self.log.debug("Downstream task_ids %s", downstream_tasks)
+        self.log.debug("Downstream tasks %s", downstream_tasks)
 
         if downstream_tasks:
-            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)
+            dag_run = context["dag_run"]
+            execution_date = context["ti"].execution_date
+
+            if self.mode == "hard":
+                self.log.info("Skipping downstream tasks using a hard short...")
+                self.skip(dag_run, execution_date, downstream_tasks)
+            elif self.mode == "soft":
+                self.log.info("Skipping downstream tasks using a soft short...")
+                # Explicitly setting the state of the direct, downstream task(s) to "skipped" and letting the
+                # Scheduler handle the remaining downstream task(s) appropriately.
+                self.skip(dag_run, execution_date, context["task"].get_direct_relatives(upstream=False))
+            else:

Review comment:
       Very fair point to handle edge cases.  Thanks @SamWheating!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] raphaelauv commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
raphaelauv commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r756722217



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       @josh-fell ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-986055227


   Let's see the tests after all the changes merged. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell edited a comment on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell edited a comment on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-986163436


   ~Apparently I shouldn't have rebased from the last commit in the branch which was the merge from `main` to this branch~Something went awry with the rebase. Wiped all of my changes and closed the PR. Can't figure out what but maybe the wrong hash was generated by `merge-base`. Not sure but should have verified. Sad panda.
   
   I have opened a new PR - #20044. Sorry about all this gents.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#discussion_r733825886



##########
File path: airflow/operators/python.py
##########
@@ -185,16 +186,37 @@ def execute(self, context: Dict):
 class ShortCircuitOperator(PythonOperator, SkipMixin):
     """
     Allows a workflow to continue only if a condition is met. Otherwise, the
-    workflow "short-circuits" and downstream tasks are skipped.
+    workflow "short-circuits" and downstream tasks are skipped. The short-circuiting can be configured to
+    operated in one of two modes: "hard" or "soft".  In "hard" mode, the default mode, all downstream tasks
+    are skipped without considering the ``trigger_rule`` defined for tasks.  In "soft" mode, the
+    ``trigger_rule`` is respected.
 
     The ShortCircuitOperator is derived from the PythonOperator. It evaluates a
-    condition and short-circuits the workflow if the condition is False. Any
-    downstream tasks are marked with a state of "skipped". If the condition is
-    True, downstream tasks proceed as normal.
+    condition and short-circuits the workflow if the condition is False. Any downstream tasks are marked with
+    a state of "skipped" based on the short-circuiting mode configured. If the condition is True, downstream
+    tasks proceed as normal.
 
-    The condition is determined by the result of `python_callable`.
+    The condition is determined by the result of ``python_callable``.
+
+    :param mode: If set to "hard", all downstream tasks from this operator task will be skipped.  This is
+        the default behavior.  If set to "soft", downstream tasks will be skipped but the ``trigger_rule``
+        defined for a task will be respected.
+    :type mode: bool
     """
 
+    modes = ["hard", "soft"]

Review comment:
       What about `respect_trigger_rules = True/False` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #17421: Add ShortCircuitOperator configurability for respecting downstream trigger rules

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #17421:
URL: https://github.com/apache/airflow/pull/17421#issuecomment-892684861


   @eladkal If you have some time, I'd love to get your feedback on the logic in this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org