You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/12 15:59:31 UTC

[airflow] 01/04: Add more weekday operator and sensor examples #26071 (#26098)

This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a10fc9aefee0054effb2ccb24717d90bad53c6a9
Author: Igor <40...@users.noreply.github.com>
AuthorDate: Fri Sep 9 04:05:52 2022 +0200

    Add more weekday operator and sensor examples #26071 (#26098)
    
    (cherry picked from commit dd6b2e4e6cb89d9eea2f3db790cb003a2e89aeff)
---
 .../example_branch_day_of_week_operator.py         | 13 +++++++-
 airflow/operators/weekday.py                       | 39 +++++++++++++++++++++-
 airflow/sensors/weekday.py                         | 15 +++++++--
 3 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py b/airflow/example_dags/example_branch_day_of_week_operator.py
index 62b0bc6ce9..76b524b932 100644
--- a/airflow/example_dags/example_branch_day_of_week_operator.py
+++ b/airflow/example_dags/example_branch_day_of_week_operator.py
@@ -24,6 +24,7 @@ import pendulum
 from airflow import DAG
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils.weekday import WeekDay
 
 with DAG(
     dag_id="example_weekday_branch_operator",
@@ -35,6 +36,8 @@ with DAG(
     # [START howto_operator_day_of_week_branch]
     empty_task_1 = EmptyOperator(task_id='branch_true')
     empty_task_2 = EmptyOperator(task_id='branch_false')
+    empty_task_3 = EmptyOperator(task_id='branch_weekend')
+    empty_task_4 = EmptyOperator(task_id='branch_mid_week')
 
     branch = BranchDayOfWeekOperator(
         task_id="make_choice",
@@ -42,7 +45,15 @@ with DAG(
         follow_task_ids_if_false="branch_false",
         week_day="Monday",
     )
+    branch_weekend = BranchDayOfWeekOperator(
+        task_id="make_weekend_choice",
+        follow_task_ids_if_true="branch_weekend",
+        follow_task_ids_if_false="branch_mid_week",
+        week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
+    )
 
-    # Run empty_task_1 if branch executes on Monday
+    # Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise
     branch >> [empty_task_1, empty_task_2]
+    # Run empty_task_3 if it's a weekend, empty_task_4 otherwise
+    empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4]
     # [END howto_operator_day_of_week_branch]
diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py
index 2d3aa0bda9..1bb8354d19 100644
--- a/airflow/operators/weekday.py
+++ b/airflow/operators/weekday.py
@@ -31,6 +31,40 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
     For more information on how to use this operator, take a look at the guide:
     :ref:`howto/operator:BranchDayOfWeekOperator`
 
+    **Example** (with single day): ::
+
+        from airflow.operators.empty import EmptyOperator
+
+        monday = EmptyOperator(task_id='monday')
+        other_day = EmptyOperator(task_id='other_day')
+
+        monday_check = DayOfWeekSensor(
+            task_id='monday_check',
+            week_day='Monday',
+            use_task_logical_date=True,
+            follow_task_ids_if_true='monday',
+            follow_task_ids_if_false='other_day',
+            dag=dag)
+        monday_check >> [monday, other_day]
+
+    **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
+
+        # import WeekDay Enum
+        from airflow.utils.weekday import WeekDay
+        from airflow.operators.empty import EmptyOperator
+
+        workday = EmptyOperator(task_id='workday')
+        weekend = EmptyOperator(task_id='weekend')
+        weekend_check = BranchDayOfWeekOperator(
+            task_id='weekend_check',
+            week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
+            use_task_logical_date=True,
+            follow_task_ids_if_true='weekend',
+            follow_task_ids_if_false='workday',
+            dag=dag)
+        # add downstream dependencies as you would do with any branch operator
+        weekend_check >> [workday, weekend]
+
     :param follow_task_ids_if_true: task id or task ids to follow if criteria met
     :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
     :param week_day: Day of the week to check (full name). Optionally, a set
@@ -42,9 +76,12 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
             * ``{WeekDay.TUESDAY}``
             * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
 
+        To use `WeekDay` enum, import it from `airflow.utils.weekday`
+
     :param use_task_logical_date: If ``True``, uses task's logical date to compare
         with is_today. Execution Date is Useful for backfilling.
         If ``False``, uses system's day of the week.
+    :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`
     """
 
     def __init__(
@@ -52,7 +89,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
         *,
         follow_task_ids_if_true: Union[str, Iterable[str]],
         follow_task_ids_if_false: Union[str, Iterable[str]],
-        week_day: Union[str, Iterable[str]],
+        week_day: Union[str, Iterable[str], WeekDay, Iterable[WeekDay]],
         use_task_logical_date: bool = False,
         use_task_execution_day: bool = False,
         **kwargs,
diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py
index ec5abbb413..da0ba9591a 100644
--- a/airflow/sensors/weekday.py
+++ b/airflow/sensors/weekday.py
@@ -16,6 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import warnings
+from typing import Iterable, Union
 
 from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.sensors.base import BaseSensorOperator
@@ -66,13 +67,23 @@ class DayOfWeekSensor(BaseSensorOperator):
             * ``{WeekDay.TUESDAY}``
             * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
 
+        To use ``WeekDay`` enum, import it from ``airflow.utils.weekday``
+
     :param use_task_logical_date: If ``True``, uses task's logical date to compare
         with week_day. Execution Date is Useful for backfilling.
         If ``False``, uses system's day of the week. Useful when you
         don't want to run anything on weekdays on the system.
+    :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`
     """
 
-    def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs):
+    def __init__(
+        self,
+        *,
+        week_day: Union[str, Iterable[str], WeekDay, Iterable[WeekDay]],
+        use_task_logical_date: bool = False,
+        use_task_execution_day: bool = False,
+        **kwargs,
+    ) -> None:
         super().__init__(**kwargs)
         self.week_day = week_day
         self.use_task_logical_date = use_task_logical_date
@@ -85,7 +96,7 @@ class DayOfWeekSensor(BaseSensorOperator):
             )
         self._week_day_num = WeekDay.validate_week_day(week_day)
 
-    def poke(self, context: Context):
+    def poke(self, context: Context) -> bool:
         self.log.info(
             'Poking until weekday is in %s, Today is %s',
             self.week_day,