You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/06/29 15:19:56 UTC
[airflow] 11/45: Replace `use_task_execution_date` with `use_task_logical_date` (#23983)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit aa3442824608d5936fc7cc12065b8dc3f71e5237
Author: eladkal <45...@users.noreply.github.com>
AuthorDate: Sat May 28 23:10:39 2022 +0300
Replace `use_task_execution_date` with `use_task_logical_date` (#23983)
* Replace `use_task_execution_date` with `use_task_logical_date`
We have some operators/sensors that use `*_execution_date` as the class parameters. This PR deprecate the usage of these parameters and replace it with `logical_date`.
There is no change in functionality, under the hood the functionality already uses `logical_date` this is just about the parameters name as exposed to the users.
(cherry picked from commit 614b2329c1603ef1e2199044e2cc9e4b7332c2e0)
---
airflow/operators/datetime.py | 15 ++++++++++++---
airflow/operators/weekday.py | 16 ++++++++++++----
airflow/sensors/weekday.py | 22 +++++++++++++++-------
tests/operators/test_datetime.py | 21 +++++++++++++++++++--
tests/operators/test_weekday.py | 19 +++++++++++++++++--
tests/sensors/test_weekday_sensor.py | 25 ++++++++++++++++++++-----
6 files changed, 95 insertions(+), 23 deletions(-)
diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py
index c37a4f9d50..c5a423d563 100644
--- a/airflow/operators/datetime.py
+++ b/airflow/operators/datetime.py
@@ -16,6 +16,7 @@
# under the License.
import datetime
+import warnings
from typing import Iterable, Union
from airflow.exceptions import AirflowException
@@ -39,7 +40,7 @@ class BranchDateTimeOperator(BaseBranchOperator):
``datetime.datetime.now()`` falls below target_lower or above ``target_upper``.
:param target_lower: target lower bound.
:param target_upper: target upper bound.
- :param use_task_execution_date: If ``True``, uses task's execution day to compare with targets.
+ :param use_task_logical_date: If ``True``, uses task's logical date to compare with targets.
Execution date is useful for backfilling. If ``False``, uses system's date.
"""
@@ -50,6 +51,7 @@ class BranchDateTimeOperator(BaseBranchOperator):
follow_task_ids_if_false: Union[str, Iterable[str]],
target_lower: Union[datetime.datetime, datetime.time, None],
target_upper: Union[datetime.datetime, datetime.time, None],
+ use_task_logical_date: bool = False,
use_task_execution_date: bool = False,
**kwargs,
) -> None:
@@ -64,10 +66,17 @@ class BranchDateTimeOperator(BaseBranchOperator):
self.target_upper = target_upper
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
- self.use_task_execution_date = use_task_execution_date
+ self.use_task_logical_date = use_task_logical_date
+ if use_task_execution_date:
+ self.use_task_logical_date = use_task_execution_date
+ warnings.warn(
+ "Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
- if self.use_task_execution_date is True:
+ if self.use_task_logical_date:
now = timezone.make_naive(context["logical_date"], self.dag.timezone)
else:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
diff --git a/airflow/operators/weekday.py b/airflow/operators/weekday.py
index fb35079fe0..b23d57e9fb 100644
--- a/airflow/operators/weekday.py
+++ b/airflow/operators/weekday.py
@@ -15,7 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
+import warnings
from typing import Iterable, Union
from airflow.operators.branch import BaseBranchOperator
@@ -41,7 +41,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
- :param use_task_execution_day: If ``True``, uses task's execution day to compare
+ :param use_task_logical_date: If ``True``, uses task's logical date to compare
with is_today. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week.
"""
@@ -52,6 +52,7 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
follow_task_ids_if_true: Union[str, Iterable[str]],
follow_task_ids_if_false: Union[str, Iterable[str]],
week_day: Union[str, Iterable[str]],
+ use_task_logical_date: bool = False,
use_task_execution_day: bool = False,
**kwargs,
) -> None:
@@ -59,11 +60,18 @@ class BranchDayOfWeekOperator(BaseBranchOperator):
self.follow_task_ids_if_true = follow_task_ids_if_true
self.follow_task_ids_if_false = follow_task_ids_if_false
self.week_day = week_day
- self.use_task_execution_day = use_task_execution_day
+ self.use_task_logical_date = use_task_logical_date
+ if use_task_execution_day:
+ self.use_task_logical_date = use_task_execution_day
+ warnings.warn(
+ "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
self._week_day_num = WeekDay.validate_week_day(week_day)
def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
- if self.use_task_execution_day:
+ if self.use_task_logical_date:
now = context["logical_date"]
else:
now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
diff --git a/airflow/sensors/weekday.py b/airflow/sensors/weekday.py
index bdf9275e10..5bb4db646f 100644
--- a/airflow/sensors/weekday.py
+++ b/airflow/sensors/weekday.py
@@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import warnings
from airflow.sensors.base import BaseSensorOperator
from airflow.utils import timezone
@@ -33,7 +34,7 @@ class DayOfWeekSensor(BaseSensorOperator):
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day='Saturday',
- use_task_execution_day=True,
+ use_task_logical_date=True,
dag=dag)
**Example** (with multiple day using set): ::
@@ -41,7 +42,7 @@ class DayOfWeekSensor(BaseSensorOperator):
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={'Saturday', 'Sunday'},
- use_task_execution_day=True,
+ use_task_logical_date=True,
dag=dag)
**Example** (with :class:`~airflow.utils.weekday.WeekDay` enum): ::
@@ -52,7 +53,7 @@ class DayOfWeekSensor(BaseSensorOperator):
weekend_check = DayOfWeekSensor(
task_id='weekend_check',
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
- use_task_execution_day=True,
+ use_task_logical_date=True,
dag=dag)
:param week_day: Day of the week to check (full name). Optionally, a set
@@ -64,16 +65,23 @@ class DayOfWeekSensor(BaseSensorOperator):
* ``{WeekDay.TUESDAY}``
* ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
- :param use_task_execution_day: If ``True``, uses task's execution day to compare
+ :param use_task_logical_date: If ``True``, uses task's logical date to compare
with week_day. Execution Date is Useful for backfilling.
If ``False``, uses system's day of the week. Useful when you
don't want to run anything on weekdays on the system.
"""
- def __init__(self, *, week_day, use_task_execution_day=False, **kwargs):
+ def __init__(self, *, week_day, use_task_logical_date=False, use_task_execution_day=False, **kwargs):
super().__init__(**kwargs)
self.week_day = week_day
- self.use_task_execution_day = use_task_execution_day
+ self.use_task_logical_date = use_task_logical_date
+ if use_task_execution_day:
+ self.use_task_logical_date = use_task_execution_day
+ warnings.warn(
+ "Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
self._week_day_num = WeekDay.validate_week_day(week_day)
def poke(self, context: Context):
@@ -82,7 +90,7 @@ class DayOfWeekSensor(BaseSensorOperator):
self.week_day,
WeekDay(timezone.utcnow().isoweekday()).name,
)
- if self.use_task_execution_day:
+ if self.use_task_logical_date:
return context['logical_date'].isoweekday() in self._week_day_num
else:
return timezone.utcnow().isoweekday() in self._week_day_num
diff --git a/tests/operators/test_datetime.py b/tests/operators/test_datetime.py
index 2bf4ff5d4a..bb1f9282a4 100644
--- a/tests/operators/test_datetime.py
+++ b/tests/operators/test_datetime.py
@@ -20,6 +20,7 @@ import datetime
import unittest
import freezegun
+import pytest
from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun, TaskInstance as TI
@@ -225,10 +226,10 @@ class TestBranchDateTimeOperator(unittest.TestCase):
)
@freezegun.freeze_time("2020-12-01 09:00:00")
- def test_branch_datetime_operator_use_task_execution_date(self):
+ def test_branch_datetime_operator_use_task_logical_date(self):
"""Check if BranchDateTimeOperator uses task execution date"""
in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
- self.branch_op.use_task_execution_date = True
+ self.branch_op.use_task_logical_date = True
self.dr = self.dag.create_dagrun(
run_id='manual_exec_date__',
start_date=in_between_date,
@@ -249,3 +250,19 @@ class TestBranchDateTimeOperator(unittest.TestCase):
'branch_2': State.SKIPPED,
}
)
+
+ def test_deprecation_warning(self):
+ warning_message = (
+ """Parameter ``use_task_execution_date`` is deprecated. Use ``use_task_logical_date``."""
+ )
+ with pytest.warns(DeprecationWarning) as warnings:
+ BranchDateTimeOperator(
+ task_id='warning',
+ follow_task_ids_if_true='branch_1',
+ follow_task_ids_if_false='branch_2',
+ target_upper=timezone.datetime(2020, 7, 7, 10, 30, 0),
+ target_lower=timezone.datetime(2020, 7, 7, 10, 30, 0),
+ use_task_execution_date=True,
+ dag=self.dag,
+ )
+ assert warning_message == str(warnings[0].message)
diff --git a/tests/operators/test_weekday.py b/tests/operators/test_weekday.py
index 69ab21a6df..5185e1728c 100644
--- a/tests/operators/test_weekday.py
+++ b/tests/operators/test_weekday.py
@@ -134,14 +134,14 @@ class TestBranchDayOfWeekOperator(unittest.TestCase):
@freeze_time("2021-01-25") # Monday
def test_branch_follow_true_with_execution_date(self):
- """Checks if BranchDayOfWeekOperator follows true branch when set use_task_execution_day"""
+ """Checks if BranchDayOfWeekOperator follows true branch when set use_task_logical_date"""
branch_op = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_1",
follow_task_ids_if_false="branch_2",
week_day="Wednesday",
- use_task_execution_day=True, # We compare to DEFAULT_DATE which is Wednesday
+ use_task_logical_date=True, # We compare to DEFAULT_DATE which is Wednesday
dag=self.dag,
)
@@ -274,3 +274,18 @@ class TestBranchDayOfWeekOperator(unittest.TestCase):
for ti in tis:
if ti.task_id == 'make_choice':
assert ti.xcom_pull(task_ids='make_choice') == 'branch_1'
+
+ def test_deprecation_warning(self):
+ warning_message = (
+ """Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
+ )
+ with pytest.warns(DeprecationWarning) as warnings:
+ BranchDayOfWeekOperator(
+ task_id="week_day_warn",
+ follow_task_ids_if_true="branch_1",
+ follow_task_ids_if_false="branch_2",
+ week_day="Monday",
+ use_task_execution_day=True,
+ dag=self.dag,
+ )
+ assert warning_message == str(warnings[0].message)
diff --git a/tests/sensors/test_weekday_sensor.py b/tests/sensors/test_weekday_sensor.py
index 04e133fa14..5aa8bdbf82 100644
--- a/tests/sensors/test_weekday_sensor.py
+++ b/tests/sensors/test_weekday_sensor.py
@@ -72,7 +72,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
)
def test_weekday_sensor_true(self, _, week_day):
op = DayOfWeekSensor(
- task_id='weekday_sensor_check_true', week_day=week_day, use_task_execution_day=True, dag=self.dag
+ task_id='weekday_sensor_check_true', week_day=week_day, use_task_logical_date=True, dag=self.dag
)
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
assert op.week_day == week_day
@@ -83,7 +83,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
poke_interval=1,
timeout=2,
week_day='Tuesday',
- use_task_execution_day=True,
+ use_task_logical_date=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
@@ -95,7 +95,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
DayOfWeekSensor(
task_id='weekday_sensor_invalid_weekday_num',
week_day=invalid_week_day,
- use_task_execution_day=True,
+ use_task_logical_date=True,
dag=self.dag,
)
@@ -110,7 +110,7 @@ class TestDayOfWeekSensor(unittest.TestCase):
DayOfWeekSensor(
task_id='weekday_sensor_check_true',
week_day=invalid_week_day,
- use_task_execution_day=True,
+ use_task_logical_date=True,
dag=self.dag,
)
@@ -120,8 +120,23 @@ class TestDayOfWeekSensor(unittest.TestCase):
poke_interval=1,
timeout=2,
week_day={WeekDay.MONDAY, WeekDay.TUESDAY},
- use_task_execution_day=True,
+ use_task_logical_date=True,
dag=self.dag,
)
with pytest.raises(AirflowSensorTimeout):
op.run(start_date=WEEKDAY_DATE, end_date=WEEKDAY_DATE, ignore_ti_state=True)
+
+ def test_deprecation_warning(self):
+ warning_message = (
+ """Parameter ``use_task_execution_day`` is deprecated. Use ``use_task_logical_date``."""
+ )
+ with pytest.warns(DeprecationWarning) as warnings:
+ DayOfWeekSensor(
+ task_id='week_day_warn',
+ poke_interval=1,
+ timeout=2,
+ week_day='Tuesday',
+ use_task_execution_day=True,
+ dag=self.dag,
+ )
+ assert warning_message == str(warnings[0].message)