You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:19:56 UTC

[airflow] 11/45: Replace `use_task_execution_date` with `use_task_logical_date` (#23983)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit aa3442824608d5936fc7cc12065b8dc3f71e5237
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Sat May 28 23:10:39 2022 +0300

    Replace `use_task_execution_date` with `use_task_logical_date` (#23983)
    
    * Replace `use_task_execution_date` with `use_task_logical_date`
    We have some operators/sensors that use `*_execution_date` as the class parameters. This PR deprecate the usage of these parameters and replace it with `logical_date`.
    There is no change in functionality, under the hood the functionality already uses `logical_date` this is just about the parameters name as exposed to the users.
    
    (cherry picked from commit 614b2329c1603ef1e2199044e2cc9e4b7332c2e0)
---
 airflow/operators/datetime.py        | 15 ++++++++++++---
 airflow/operators/weekday.py         | 16 ++++++++++++----
 airflow/sensors/weekday.py           | 22 +++++++++++++++-------
 tests/operators/test_datetime.py     | 21 +++++++++++++++++++--
 tests/operators/test_weekday.py      | 19 +++++++++++++++++--
 tests/sensors/test_weekday_sensor.py | 25 ++++++++++++++++++++-----
 6 files changed, 95 insertions(+), 23 deletions(-)

diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py
index c37a4f9d50..c5a423d563 100644
--- a/airflow/operators/datetime.py
+++ b/airflow/operators/datetime.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import datetime
+import warnings
 from typing import Iterable, Union
 
 from airflow.exceptions import AirflowException
@@ -39,7 +40,7 @@ class BranchDateTimeOperator(BaseBranchOperator):
         ``datetime.datetime.now()`` falls below target_lower or above ``target_upper``.
     :param target_lower: target lower bound.
     :param target_upper: target upper bound.
-    :param use_task_execution_date: If ``True``, uses task's execution day to compare with targets.
+    :param use_task_logical_date: If ``True``, uses task's logical date to compare with targets.
         Execution date is useful for backfilling. If ``False``, uses system's date.
     """
 
@@ -50,6 +51,7 @@ class BranchDateTimeOperator(BaseBranchOperator):
         follow_task_ids_if_false: Union[str, Iterable[str]],
         target_lower: Union[datetime.datetime, datetime.time, None],
         target_upper: Union[datetime.datetime, datetime.time, None],
+        use_task_logical_date: bool = False,
         use_task_execution_date: bool = False,
         **kwargs,
     ) -> None:
@@ -64,10 +66,17 @@ class BranchDateTimeOperator(BaseBranchOperator):
         self.target_upper = target_upper
         self.follow_task_ids_if_true = follow_task_ids_if_true
         self.follow_task_ids_if_false = follow_task_ids_if_false
-        self.use_task_execution_date = use_task_execution_date
+        self.use_task_logical_date = use_task_logical_date
+        if use_task_execution_date:
+            self.use_task_logical_date = use_task_execution_date
+            warnings.warn(
+                "Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
 
     def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
-        if self.use_task_execution_date is True:
+        if self.use_task_logical_date:
             now = timezone.make_naive(context["logical_date"], self.dag.timezone)
         else:
             now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py
index fb35079fe0..b23d57e9fb 100644
--- a/airflow/operators/weekday.py
+++ b/airflow/operators/weekday.py
@@ -15,7 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import warnings
 from typing import Iterable, Union
 
 from airflow.operators.branch import BaseBranchOperator
@@ -41,7 +41,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
             * ``{WeekDay.TUESDAY}``
             * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
 
-    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+    :param use_task_logical_date: If ``True``, uses task's logical date to compare
         with is_today. Execution Date is Useful for backfilling.
         If ``False``, uses system's day of the week.
     """
@@ -52,6 +52,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
         follow_task_ids_if_true: Union[str, Iterable[str]],
         follow_task_ids_if_false: Union[str, Iterable[str]],
         week_day: Union[str, Iterable[str]],
+        use_task_logical_date: bool = False,
         use_task_execution_day: bool = False,
         **kwargs,
     ) -> None:
@@ -59,11 +60,18 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
         self.follow_task_ids_if_true = follow_task_ids_if_true
         self.follow_task_ids_if_false = follow_task_ids_if_false
         self.week_day = week_day
-        self.use_task_execution_day = use_task_execution_day
+        self.use_task_logical_date = use_task_logical_date
+        if use_task_execution_day:
+            self.use_task_logical_date = use_task_execution_day
+            warnings.warn(
+                "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
         self._week_day_num = WeekDay.validate_week_day(week_day)
 
     def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
-        if self.use_task_execution_day:
+        if self.use_task_logical_date:
             now = context["logical_date"]
         else:
             now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py
index bdf9275e10..5bb4db646f 100644
--- a/airflow/sensors/weekday.py
+++ b/airflow/sensors/weekday.py
@@ -15,6 +15,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import warnings
 
 from airflow.sensors.base import BaseSensorOperator
 from airflow.utils import timezone
@@ -33,7 +34,7 @@ class DayOfWeekSensor(BaseSensorOperator):
         weekend_check = DayOfWeekSensor(
             task_id='weekend_check',
             week_day='Saturday',
-            use_task_execution_day=True,
+            use_task_logical_date=True,
             dag=dag)
 
     **Example** (with multiple day using set): ::
@@ -41,7 +42,7 @@ class DayOfWeekSensor(BaseSensorOperator):
         weekend_check = DayOfWeekSensor(
             task_id='weekend_check',
             week_day={'Saturday', 'Sunday'},
-            use_task_execution_day=True,
+            use_task_logical_date=True,
             dag=dag)
 
     **Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
@@ -52,7 +53,7 @@ class DayOfWeekSensor(BaseSensorOperator):
         weekend_check = DayOfWeekSensor(
             task_id='weekend_check',
             week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
-            use_task_execution_day=True,
+            use_task_logical_date=True,
             dag=dag)
 
     :param week_day: Day of the week to check (full name). Optionally, a set
@@ -64,16 +65,23 @@ class DayOfWeekSensor(BaseSensorOperator):
             * ``{WeekDay.TUESDAY}``
             * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
 
-    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+    :param use_task_logical_date: If ``True``, uses task's logical date to compare
         with week_day. Execution Date is Useful for backfilling.
         If ``False``, uses system's day of the week. Useful when you
         don't want to run anything on weekdays on the system.
     """
 
-    def __init__(self, *, week_day, use_task_execution_day=False, **kwargs):
+    def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs):
         super().__init__(**kwargs)
         self.week_day = week_day
-        self.use_task_execution_day = use_task_execution_day
+        self.use_task_logical_date = use_task_logical_date
+        if use_task_execution_day:
+            self.use_task_logical_date = use_task_execution_day
+            warnings.warn(
+                "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
+                DeprecationWarning,
+                stacklevel=2,
+            )
         self._week_day_num = WeekDay.validate_week_day(week_day)
 
     def poke(self, context: Context):
@@ -82,7 +90,7 @@ class DayOfWeekSensor(BaseSensorOperator):
             self.week_day,
             WeekDay(timezone.utcnow().isoweekday()).name,
         )
-        if self.use_task_execution_day:
+        if self.use_task_logical_date:
             return context['logical_date'].isoweekday() in self._week_day_num
         else:
             return timezone.utcnow().isoweekday() in self._week_day_num
diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py
index 2bf4ff5d4a..bb1f9282a4 100644
--- a/tests/operators/test_datetime.py
+++ b/tests/operators/test_datetime.py
@@ -20,6 +20,7 @@ import datetime
 import unittest
 
 import freezegun
+import pytest
 
 from airflow.exceptions import AirflowException
 from airflow.models import DAG, DagRun, TaskInstance as TI
@@ -225,10 +226,10 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                 )
 
     @freezegun.freeze_time("2020-12-01 09:00:00")
-    def test_branch_datetime_operator_use_task_execution_date(self):
+    def test_branch_datetime_operator_use_task_logical_date(self):
         """Check if BranchDateTimeOperator uses task execution date"""
         in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
-        self.branch_op.use_task_execution_date = True
+        self.branch_op.use_task_logical_date = True
         self.dr = self.dag.create_dagrun(
             run_id='manual_exec_date__',
             start_date=in_between_date,
@@ -249,3 +250,19 @@ class TestBranchDateTimeOperator(unittest.TestCase):
                         'branch_2': State.SKIPPED,
                     }
                 )
+
+    def test_deprecation_warning(self):
+        warning_message = (
+            """Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``."""
+        )
+        with pytest.warns(DeprecationWarning) as warnings:
+            BranchDateTimeOperator(
+                task_id='warning',
+                follow_task_ids_if_true='branch_1',
+                follow_task_ids_if_false='branch_2',
+                target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0),
+                target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0),
+                use_task_execution_date=True,
+                dag=self.dag,
+            )
+        assert warning_message == str(warnings[0].message)
diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py
index 69ab21a6df..5185e1728c 100644
--- a/tests/operators/test_weekday.py
+++ b/tests/operators/test_weekday.py
@@ -134,14 +134,14 @@ class TestBranchDayOfWeekOperator(unittest.TestCase):
 
     @freeze_time("2021-01-25")  # Monday
     def test_branch_follow_true_with_execution_date(self):
-        """Checks if BranchDayOfWeekOperator follows true branch when set use_task_execution_day"""
+        """Checks if BranchDayOfWeekOperator follows true branch when set use_task_logical_date"""
 
         branch_op = BranchDayOfWeekOperator(
             task_id="make_choice",
             follow_task_ids_if_true="branch_1",
             follow_task_ids_if_false="branch_2",
             week_day="Wednesday",
-            use_task_execution_day=True,  # We compare to DEFAULT_DATE which is Wednesday
+            use_task_logical_date=True,  # We compare to DEFAULT_DATE which is Wednesday
             dag=self.dag,
         )
 
@@ -274,3 +274,18 @@ class TestBranchDayOfWeekOperator(unittest.TestCase):
         for ti in tis:
             if ti.task_id == 'make_choice':
                 assert ti.xcom_pull(task_ids='make_choice') == 'branch_1'
+
+    def test_deprecation_warning(self):
+        warning_message = (
+            """Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
+        )
+        with pytest.warns(DeprecationWarning) as warnings:
+            BranchDayOfWeekOperator(
+                task_id="week_day_warn",
+                follow_task_ids_if_true="branch_1",
+                follow_task_ids_if_false="branch_2",
+                week_day="Monday",
+                use_task_execution_day=True,
+                dag=self.dag,
+            )
+        assert warning_message == str(warnings[0].message)
diff --git a/tests/sensors/test_weekday_sensor.py b/tests/sensors/test_weekday_sensor.py
index 04e133fa14..5aa8bdbf82 100644
--- a/tests/sensors/test_weekday_sensor.py
+++ b/tests/sensors/test_weekday_sensor.py
@@ -72,7 +72,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
     )
     def test_weekday_sensor_true(self, _, week_day):
         op = DayOfWeekSensor(
-            task_id='weekday_sensor_check_true', week_day=week_day, use_task_execution_day=True, dag=self.dag
+            task_id='weekday_sensor_check_true', week_day=week_day, use_task_logical_date=True, dag=self.dag
         )
         op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
         assert op.week_day == week_day
@@ -83,7 +83,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
             poke_interval=1,
             timeout=2,
             week_day='Tuesday',
-            use_task_execution_day=True,
+            use_task_logical_date=True,
             dag=self.dag,
         )
         with pytest.raises(AirflowSensorTimeout):
@@ -95,7 +95,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
             DayOfWeekSensor(
                 task_id='weekday_sensor_invalid_weekday_num',
                 week_day=invalid_week_day,
-                use_task_execution_day=True,
+                use_task_logical_date=True,
                 dag=self.dag,
             )
 
@@ -110,7 +110,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
             DayOfWeekSensor(
                 task_id='weekday_sensor_check_true',
                 week_day=invalid_week_day,
-                use_task_execution_day=True,
+                use_task_logical_date=True,
                 dag=self.dag,
             )
 
@@ -120,8 +120,23 @@ class TestDayOfWeekSensor(unittest.TestCase):
             poke_interval=1,
             timeout=2,
             week_day={WeekDay.MONDAY, WeekDay.TUESDAY},
-            use_task_execution_day=True,
+            use_task_logical_date=True,
             dag=self.dag,
         )
         with pytest.raises(AirflowSensorTimeout):
             op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
+
+    def test_deprecation_warning(self):
+        warning_message = (
+            """Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
+        )
+        with pytest.warns(DeprecationWarning) as warnings:
+            DayOfWeekSensor(
+                task_id='week_day_warn',
+                poke_interval=1,
+                timeout=2,
+                week_day='Tuesday',
+                use_task_execution_day=True,
+                dag=self.dag,
+            )
+        assert warning_message == str(warnings[0].message)