You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/09/12 15:59:31 UTC
[airflow] 01/04: Add more weekday operator and sensor examples #26071 (#26098)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch v2-4-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit a10fc9aefee0054effb2ccb24717d90bad53c6a9
Author: Igor <40...@users.noreply.github.com>
AuthorDate: Fri Sep 9 04:05:52 2022 +0200
Add more weekday operator and sensor examples #26071 (#26098)
(cherry picked from commit dd6b2e4e6cb89d9eea2f3db790cb003a2e89aeff)
---
.../example_branch_day_of_week_operator.py | 13 +++++++-
airflow/operators/weekday.py | 39 +++++++++++++++++++++-
airflow/sensors/weekday.py | 15 +++++++--
3 files changed, 63 insertions(+), 4 deletions(-)
diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py b/airflow/example_dags/example_branch_day_of_week_operator.py
index 62b0bc6ce9..76b524b932 100644
--- a/airflow/example_dags/example_branch_day_of_week_operator.py
+++ b/airflow/example_dags/example_branch_day_of_week_operator.py
@@ -24,6 +24,7 @@ import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils.weekday import WeekDay
with DAG(
dag_id="example_weekday_branch_operator",
@@ -35,6 +36,8 @@ with DAG(
# [START howto_operator_day_of_week_branch]
empty_task_1 = EmptyOperator(task_id='branch_true')
empty_task_2 = EmptyOperator(task_id='branch_false')
+ empty_task_3 = EmptyOperator(task_id='branch_weekend')
+ empty_task_4 = EmptyOperator(task_id='branch_mid_week')
branch = BranchDayOfWeekOperator(
task_id="make_choice",
@@ -42,7 +45,15 @@ with DAG(
follow_task_ids_if_false="branch_false",
week_day="Monday",
)
+ branch_weekend = BranchDayOfWeekOperator(
+ task_id="make_weekend_choice",
+ follow_task_ids_if_true="branch_weekend",
+ follow_task_ids_if_false="branch_mid_week",
+ week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
+ )
- # Run empty_task_1 if branch executes on Monday
+ # Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise
branch >> [empty_task_1, empty_task_2]
+ # Run empty_task_3 if it's a weekend, empty_task_4 otherwise
+ empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4]
# [END howto_operator_day_of_week_branch]
diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py
index 2d3aa0bda9..1bb8354d19 100644
--- a/airflow/operators/weekday.py
+++ b/airflow/operators/weekday.py
@@ -31,6 +31,40 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BranchDayOfWeekOperator`
+ **Example** (with single day): ::
+
+ from airflow.operators.empty import EmptyOperator
+
+ monday = EmptyOperator(task_id='monday')
+ other_day = EmptyOperator(task_id='other_day')
+
+ monday_check = DayOfWeekSensor(
+ task_id='monday_check',
+ week_day='Monday',
+ use_task_logical_date=True,
+ follow_task_ids_if_true='monday',
+ follow_task_ids_if_false='other_day',
+ dag=dag)
+ monday_check >> [monday, other_day]
+
+ **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
+
+ # import WeekDay Enum
+ from airflow.utils.weekday import WeekDay
+ from airflow.operators.empty import EmptyOperator
+
+ workday = EmptyOperator(task_id='workday')
+ weekend = EmptyOperator(task_id='weekend')
+ weekend_check = BranchDayOfWeekOperator(
+ task_id='weekend_check',
+ week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
+ use_task_logical_date=True,
+ follow_task_ids_if_true='weekend',
+ follow_task_ids_if_false='workday',
+ dag=dag)
+ # add downstream dependencies as you would do with any branch operator
+ weekend_check >> [workday, weekend]
+
:param follow_task_ids_if_true: task id or task ids to follow if criteria met
:param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
:param week_day: Day of the week to check (full name). Optionally, a set
@@ -42,9 +76,12 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+ To use `WeekDay` enum, import it from `airflow.utils.weekday`
+
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with is_today. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week.
+ :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`
"""
def __init__(
@@ -52,7 +89,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
*,
follow_task_ids_if_true: Union[str, Iterable[str]],
follow_task_ids_if_false: Union[str, Iterable[str]],
- week_day: Union[str, Iterable[str]],
+ week_day: Union[str, Iterable[str], WeekDay, Iterable[WeekDay]],
use_task_logical_date: bool = False,
use_task_execution_day: bool = False,
**kwargs,
diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py
index ec5abbb413..da0ba9591a 100644
--- a/airflow/sensors/weekday.py
+++ b/airflow/sensors/weekday.py
@@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import warnings
+from typing import Iterable, Union
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.sensors.base import BaseSensorOperator
@@ -66,13 +67,23 @@ class DayOfWeekSensor(BaseSensorOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+ To use ``WeekDay`` enum, import it from ``airflow.utils.weekday``
+
:param use_task_logical_date: If ``True``, uses task's logical date to compare
with week_day. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week. Useful when you
don't want to run anything on weekdays on the system.
+ :param use_task_execution_day: deprecated parameter, same effect as `use_task_logical_date`
"""
- def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs):
+ def __init__(
+ self,
+ *,
+ week_day: Union[str, Iterable[str], WeekDay, Iterable[WeekDay]],
+ use_task_logical_date: bool = False,
+ use_task_execution_day: bool = False,
+ **kwargs,
+ ) -> None:
super().__init__(**kwargs)
self.week_day = week_day
self.use_task_logical_date = use_task_logical_date
@@ -85,7 +96,7 @@ class DayOfWeekSensor(BaseSensorOperator):
)
self._week_day_num = WeekDay.validate_week_day(week_day)
- def poke(self, context: Context):
+ def poke(self, context: Context) -> bool:
self.log.info(
'Poking until weekday is in %s, Today is %s',
self.week_day,