You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ta...@apache.org on 2023/12/18 16:21:51 UTC

(airflow) branch pendulum-3 created (now 840d301b3c)

This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a change to branch pendulum-3
in repository https://gitbox.apache.org/repos/asf/airflow.git


      at 840d301b3c Add support of Pendulum 3

This branch includes the following new commits:

     new 840d301b3c Add support of Pendulum 3

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



(airflow) 01/01: Add support of Pendulum 3

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch pendulum-3
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 840d301b3cef362c33ad1fec7ff49f6f0fe540b4
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Mon Dec 18 20:14:59 2023 +0400

    Add support of Pendulum 3
---
 airflow/serialization/serialized_objects.py    |  7 ++--
 airflow/serialization/serializers/datetime.py  | 15 ++++----
 airflow/serialization/serializers/timezone.py  |  7 ++--
 airflow/settings.py                            | 11 +++---
 airflow/timetables/_cron.py                    | 10 +++---
 airflow/utils/sqlalchemy.py                    |  5 +--
 airflow/utils/timezone.py                      | 37 ++++++++++++++++----
 generated/provider_dependencies.json           |  2 +-
 setup.cfg                                      |  4 +--
 tests/models/test_dag.py                       |  9 ++---
 tests/sensors/test_time_sensor.py              | 17 +++++-----
 tests/serialization/test_serialized_objects.py |  2 +-
 tests/utils/test_timezone.py                   | 47 +++++++++++++++++++++++---
 13 files changed, 112 insertions(+), 61 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py
index 48aa595933..71a4144edf 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -65,6 +65,7 @@ from airflow.utils.docs import get_docs_url
 from airflow.utils.module_loading import import_string, qualname
 from airflow.utils.operator_resources import Resources
 from airflow.utils.task_group import MappedTaskGroup, TaskGroup
+from airflow.utils.timezone import parse_timezone
 from airflow.utils.types import NOTSET, ArgNotSet
 
 if TYPE_CHECKING:
@@ -167,9 +168,9 @@ def encode_timezone(var: Timezone) -> str | int:
     )
 
 
-def decode_timezone(var: str | int) -> Timezone:
+def decode_timezone(var: str | int):
     """Decode a previously serialized Pendulum Timezone."""
-    return pendulum.tz.timezone(var)
+    return parse_timezone(var)
 
 
 def _get_registered_timetable(importable_string: str) -> type[Timetable] | None:
@@ -607,7 +608,7 @@ class BaseSerialization:
             raise TypeError(f"Invalid type {type_!s} in deserialization.")
 
     _deserialize_datetime = pendulum.from_timestamp
-    _deserialize_timezone = pendulum.tz.timezone
+    _deserialize_timezone = parse_timezone
 
     @classmethod
     def _deserialize_timedelta(cls, seconds: int) -> datetime.timedelta:
diff --git a/airflow/serialization/serializers/datetime.py b/airflow/serialization/serializers/datetime.py
index 49f0899a59..ea030a8afc 100644
--- a/airflow/serialization/serializers/datetime.py
+++ b/airflow/serialization/serializers/datetime.py
@@ -24,7 +24,7 @@ from airflow.serialization.serializers.timezone import (
     serialize as serialize_timezone,
 )
 from airflow.utils.module_loading import qualname
-from airflow.utils.timezone import convert_to_utc, is_naive
+from airflow.utils.timezone import convert_to_utc, is_naive, parse_timezone
 
 if TYPE_CHECKING:
     import datetime
@@ -65,23 +65,22 @@ def deserialize(classname: str, version: int, data: dict | str) -> datetime.date
     import datetime
 
     from pendulum import DateTime
-    from pendulum.tz import fixed_timezone, timezone
 
     tz: datetime.tzinfo | None = None
     if isinstance(data, dict) and TIMEZONE in data:
         if version == 1:
             # try to deserialize unsupported timezones
             timezone_mapping = {
-                "EDT": fixed_timezone(-4 * 3600),
-                "CDT": fixed_timezone(-5 * 3600),
-                "MDT": fixed_timezone(-6 * 3600),
-                "PDT": fixed_timezone(-7 * 3600),
-                "CEST": timezone("CET"),
+                "EDT": parse_timezone(-4 * 3600),
+                "CDT": parse_timezone(-5 * 3600),
+                "MDT": parse_timezone(-6 * 3600),
+                "PDT": parse_timezone(-7 * 3600),
+                "CEST": parse_timezone("CET"),
             }
             if data[TIMEZONE] in timezone_mapping:
                 tz = timezone_mapping[data[TIMEZONE]]
             else:
-                tz = timezone(data[TIMEZONE])
+                tz = parse_timezone(data[TIMEZONE])
         else:
             tz = deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0])
 
diff --git a/airflow/serialization/serializers/timezone.py b/airflow/serialization/serializers/timezone.py
index 23901b9d44..0f580adef8 100644
--- a/airflow/serialization/serializers/timezone.py
+++ b/airflow/serialization/serializers/timezone.py
@@ -74,7 +74,7 @@ def serialize(o: object) -> tuple[U, str, int, bool]:
 
 
 def deserialize(classname: str, version: int, data: object) -> Any:
-    from pendulum.tz import fixed_timezone, timezone
+    from airflow.utils.timezone import parse_timezone
 
     if not isinstance(data, (str, int)):
         raise TypeError(f"{data} is not of type int or str but of {type(data)}")
@@ -82,9 +82,6 @@ def deserialize(classname: str, version: int, data: object) -> Any:
     if version > __version__:
         raise TypeError(f"serialized {version} of {classname} > {__version__}")
 
-    if isinstance(data, int):
-        return fixed_timezone(data)
-
     if "zoneinfo.ZoneInfo" in classname:
         try:
             from zoneinfo import ZoneInfo
@@ -93,7 +90,7 @@ def deserialize(classname: str, version: int, data: object) -> Any:
 
         return ZoneInfo(data)
 
-    return timezone(data)
+    return parse_timezone(data)
 
 
 # ported from pendulum.tz.timezone._get_tzinfo_name
diff --git a/airflow/settings.py b/airflow/settings.py
index 1a38a59ed3..53c5cc6aa4 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -26,7 +26,6 @@ import sys
 import warnings
 from typing import TYPE_CHECKING, Any, Callable
 
-import pendulum
 import pluggy
 import sqlalchemy
 from sqlalchemy import create_engine, exc, text
@@ -40,6 +39,7 @@ from airflow.executors import executor_constants
 from airflow.logging_config import configure_logging
 from airflow.utils.orm_event_handlers import setup_event_handlers
 from airflow.utils.state import State
+from airflow.utils.timezone import local_timezone, parse_timezone, utc
 
 if TYPE_CHECKING:
     from sqlalchemy.engine import Engine
@@ -50,13 +50,12 @@ if TYPE_CHECKING:
 log = logging.getLogger(__name__)
 
 try:
-    tz = conf.get_mandatory_value("core", "default_timezone")
-    if tz == "system":
-        TIMEZONE = pendulum.tz.local_timezone()
+    if (tz := conf.get_mandatory_value("core", "default_timezone")) != "system":
+        TIMEZONE = parse_timezone(tz)
     else:
-        TIMEZONE = pendulum.tz.timezone(tz)
+        TIMEZONE = local_timezone()
 except Exception:
-    TIMEZONE = pendulum.tz.timezone("UTC")
+    TIMEZONE = utc
 
 log.info("Configured default timezone %s", TIMEZONE)
 
diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py
index 45bfe3640f..15e4f820ea 100644
--- a/airflow/timetables/_cron.py
+++ b/airflow/timetables/_cron.py
@@ -19,17 +19,15 @@ from __future__ import annotations
 import datetime
 from typing import TYPE_CHECKING, Any
 
-import pendulum
 from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
 from croniter import CroniterBadCronError, CroniterBadDateError, croniter
 
 from airflow.exceptions import AirflowTimetableInvalid
 from airflow.utils.dates import cron_presets
-from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
+from airflow.utils.timezone import convert_to_utc, make_aware, make_naive, parse_timezone
 
 if TYPE_CHECKING:
-    from pendulum import DateTime
-    from pendulum.tz.timezone import Timezone
+    from pendulum import DateTime, FixedTimezone, Timezone
 
 
 def _covers_every_hour(cron: croniter) -> bool:
@@ -63,11 +61,11 @@ def _covers_every_hour(cron: croniter) -> bool:
 class CronMixin:
     """Mixin to provide interface to work with croniter."""
 
-    def __init__(self, cron: str, timezone: str | Timezone) -> None:
+    def __init__(self, cron: str, timezone: str | Timezone | FixedTimezone) -> None:
         self._expression = cron_presets.get(cron, cron)
 
         if isinstance(timezone, str):
-            timezone = pendulum.tz.timezone(timezone)
+            timezone = parse_timezone(timezone)
         self._timezone = timezone
 
         try:
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index a042d4e902..fb241f482f 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -24,7 +24,6 @@ import json
 import logging
 from typing import TYPE_CHECKING, Any, Generator, Iterable, overload
 
-import pendulum
 from dateutil import relativedelta
 from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
 from sqlalchemy.dialects import mssql, mysql
@@ -34,7 +33,7 @@ from sqlalchemy.types import JSON, Text, TypeDecorator, UnicodeText
 from airflow import settings
 from airflow.configuration import conf
 from airflow.serialization.enums import Encoding
-from airflow.utils.timezone import make_naive
+from airflow.utils.timezone import make_naive, utc
 
 if TYPE_CHECKING:
     from kubernetes.client.models.v1_pod import V1Pod
@@ -46,8 +45,6 @@ if TYPE_CHECKING:
 
 log = logging.getLogger(__name__)
 
-utc = pendulum.tz.timezone("UTC")
-
 
 class UtcDateTime(TypeDecorator):
     """
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 12c75bef59..fb32c093f8 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -23,9 +23,10 @@ from typing import overload
 import pendulum
 from dateutil.relativedelta import relativedelta
 from pendulum.datetime import DateTime
+from pendulum.tz.timezone import FixedTimezone, Timezone
 
 # UTC time zone as a tzinfo instance.
-utc = pendulum.tz.timezone("UTC")
+utc = Timezone("UTC")
 
 
 def is_localized(value):
@@ -135,12 +136,10 @@ def make_aware(value: dt.datetime | None, timezone: dt.tzinfo | None = None) ->
     # Check that we won't overwrite the timezone of an aware datetime.
     if is_localized(value):
         raise ValueError(f"make_aware expects a naive datetime, got {value}")
-    if hasattr(value, "fold"):
-        # In case of python 3.6 we want to do the same that pendulum does for python3.5
-        # i.e in case we move clock back we want to schedule the run at the time of the second
-        # instance of the same clock time rather than the first one.
-        # Fold parameter has no impact in other cases so we can safely set it to 1 here
-        value = value.replace(fold=1)
+    # In case we move clock back we want to schedule the run at the time of the second
+    # instance of the same clock time rather than the first one.
+    # Fold parameter has no impact in other cases, so we can safely set it to 1 here
+    value = value.replace(fold=1)
     localized = getattr(timezone, "localize", None)
     if localized is not None:
         # This method is available for pytz time zones
@@ -273,3 +272,27 @@ def td_format(td_object: None | dt.timedelta | float | int) -> str | None:
     if not joined:
         return "<1s"
     return joined
+
+
+def parse_timezone(name: str | int) -> FixedTimezone | Timezone:
+    """
+    Parse timezone and return one of the pendulum Timezone.
+
+    Provide the same interface as ``pendulum.timezone(name)``
+
+    :param name: Either IANA timezone or offset to UTC in seconds.
+
+    :meta private:
+    """
+    return pendulum.timezone(name)
+
+
+def local_timezone() -> FixedTimezone | Timezone:
+    """
+    Return local timezone.
+
+    Provide the same interface as ``pendulum.tz.local_timezone()``
+
+    :meta private:
+    """
+    return pendulum.tz.local_timezone()
diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json
index 780009c536..b755f6c839 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -418,7 +418,7 @@
       "asgiref>=3.5.2",
       "gcloud-aio-auth>=4.0.0,<5.0.0",
       "gcloud-aio-bigquery>=6.1.2",
-      "gcloud-aio-storage",
+      "gcloud-aio-storage>=9.0.0",
       "gcsfs>=2023.10.0",
       "google-ads>=22.1.0",
       "google-api-core>=2.11.0",
diff --git a/setup.cfg b/setup.cfg
index 9077a02915..3c8208af01 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -129,9 +129,7 @@ install_requires =
     opentelemetry-exporter-otlp
     packaging>=14.0
     pathspec>=0.9.0
-    # When (if) pendulum 3 released it would introduce changes in module/objects imports,
-    # since we are tightly coupled with pendulum library internally it will breaks Airflow functionality.
-    pendulum>=2.0,<3.0
+    pendulum>=3.0
     pluggy>=1.0
     psutil>=4.2.0
     pydantic>=2.3.0
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index f7bf1ad6d0..de7fa87d5a 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -37,6 +37,7 @@ import pendulum
 import pytest
 import time_machine
 from dateutil.relativedelta import relativedelta
+from pendulum.tz.timezone import Timezone
 from sqlalchemy import inspect
 
 from airflow import settings
@@ -676,8 +677,8 @@ class TestDag:
         """
         Make sure DST transitions are properly observed
         """
-        local_tz = pendulum.timezone("Europe/Zurich")
-        start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55), dst_rule=pendulum.PRE_TRANSITION)
+        local_tz = Timezone("Europe/Zurich")
+        start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55, fold=0))
         assert start.isoformat() == "2018-10-28T02:55:00+02:00", "Pre-condition: start date is in DST"
 
         utc = timezone.convert_to_utc(start)
@@ -706,7 +707,7 @@ class TestDag:
         Make sure DST transitions are properly observed
         """
         local_tz = pendulum.timezone("Europe/Zurich")
-        start = local_tz.convert(datetime.datetime(2018, 10, 27, 3), dst_rule=pendulum.PRE_TRANSITION)
+        start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, fold=0))
 
         utc = timezone.convert_to_utc(start)
 
@@ -735,7 +736,7 @@ class TestDag:
         Make sure DST transitions are properly observed
         """
         local_tz = pendulum.timezone("Europe/Zurich")
-        start = local_tz.convert(datetime.datetime(2018, 3, 25, 2), dst_rule=pendulum.PRE_TRANSITION)
+        start = local_tz.convert(datetime.datetime(2018, 3, 25, 2, fold=0))
 
         utc = timezone.convert_to_utc(start)
 
diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py
index 935d1cb128..bc30e73144 100644
--- a/tests/sensors/test_time_sensor.py
+++ b/tests/sensors/test_time_sensor.py
@@ -18,12 +18,11 @@
 from __future__ import annotations
 
 from datetime import datetime, time
-from unittest.mock import patch
 
 import pendulum
 import pytest
 import time_machine
-from pendulum.tz.timezone import UTC
+from pendulum.tz.timezone import Timezone
 
 from airflow.exceptions import TaskDeferred
 from airflow.models.dag import DAG
@@ -33,7 +32,7 @@ from airflow.utils import timezone
 
 DEFAULT_TIMEZONE = "Asia/Singapore"  # UTC+08:00
 DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1)
-DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE))
+DEFAULT_DATE_WITH_TZ = datetime(2015, 1, 1, tzinfo=Timezone(DEFAULT_TIMEZONE))
 
 
 class TestTimeSensor:
@@ -46,11 +45,11 @@ class TestTimeSensor:
         ],
     )
     @time_machine.travel(timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc))
-    def test_timezone(self, default_timezone, start_date, expected):
-        with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
-            dag = DAG("test", default_args={"start_date": start_date})
-            op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
-            assert op.poke(None) == expected
+    def test_timezone(self, default_timezone, start_date, expected, monkeypatch):
+        monkeypatch.setattr("airflow.settings.TIMEZONE", Timezone(default_timezone))
+        dag = DAG("test", default_args={"start_date": start_date})
+        op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
+        assert op.poke(None) == expected
 
 
 class TestTimeSensorAsync:
@@ -85,4 +84,4 @@ class TestTimeSensorAsync:
         ):
             op = TimeSensorAsync(task_id="test", target_time=pendulum.time(9, 0))
             assert op.target_datetime.time() == pendulum.time(1, 0)
-            assert op.target_datetime.tzinfo == UTC
+            assert op.target_datetime.tzinfo == timezone.utc
diff --git a/tests/serialization/test_serialized_objects.py b/tests/serialization/test_serialized_objects.py
index c059a8d236..96b1963579 100644
--- a/tests/serialization/test_serialized_objects.py
+++ b/tests/serialization/test_serialized_objects.py
@@ -142,7 +142,7 @@ def equal_time(a: datetime, b: datetime) -> bool:
         (1, None, equals),
         (datetime.utcnow(), DAT.DATETIME, equal_time),
         (timedelta(minutes=2), DAT.TIMEDELTA, equals),
-        (pendulum.tz.timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name),
+        (pendulum.tz.Timezone("UTC"), DAT.TIMEZONE, lambda a, b: a.name == b.name),
         (relativedelta.relativedelta(hours=+1), DAT.RELATIVEDELTA, lambda a, b: a.hours == b.hours),
         ({"test": "dict", "test-1": 1}, None, equals),
         (["array_item", 2], None, equals),
diff --git a/tests/utils/test_timezone.py b/tests/utils/test_timezone.py
index ff5ad26f5a..df8af04604 100644
--- a/tests/utils/test_timezone.py
+++ b/tests/utils/test_timezone.py
@@ -21,13 +21,14 @@ import datetime
 
 import pendulum
 import pytest
+from pendulum.tz.timezone import Timezone
 
 from airflow.utils import timezone
-from airflow.utils.timezone import coerce_datetime
+from airflow.utils.timezone import coerce_datetime, parse_timezone
 
-CET = pendulum.tz.timezone("Europe/Paris")
-EAT = pendulum.tz.timezone("Africa/Nairobi")  # Africa/Nairobi
-ICT = pendulum.tz.timezone("Asia/Bangkok")  # Asia/Bangkok
+CET = Timezone("Europe/Paris")
+EAT = Timezone("Africa/Nairobi")  # Africa/Nairobi
+ICT = Timezone("Asia/Bangkok")  # Asia/Bangkok
 UTC = timezone.utc
 
 
@@ -117,3 +118,41 @@ class TestTimezone:
 )
 def test_coerce_datetime(input_datetime, output_datetime):
     assert output_datetime == coerce_datetime(input_datetime)
+
+
+@pytest.mark.parametrize(
+    "tz_name",
+    [
+        pytest.param("Europe/Paris", id="CET"),
+        pytest.param("Africa/Nairobi", id="EAT"),
+        pytest.param("Asia/Bangkok", id="ICT"),
+    ],
+)
+def test_parse_timezone_iana(tz_name: str):
+    tz = parse_timezone(tz_name)
+    assert tz.name == tz_name
+    assert parse_timezone(tz_name) is tz
+
+
+@pytest.mark.parametrize("tz_name", ["utc", "UTC", "uTc"])
+def test_parse_timezone_utc(tz_name):
+    tz = parse_timezone(tz_name)
+    assert tz.name == "UTC"
+    assert parse_timezone(tz_name) is tz
+    assert tz is timezone.utc, "Expected that UTC timezone is same object as `airflow.utils.timezone.utc`"
+
+
+@pytest.mark.parametrize(
+    "tz_offset, expected_offset, expected_name",
+    [
+        pytest.param(0, 0, "+00:00", id="zero-offset"),
+        pytest.param(-3600, -3600, "-01:00", id="1-hour-behind"),
+        pytest.param(19800, 19800, "+05:30", id="5.5-hours-ahead"),
+    ],
+)
+def test_parse_timezone_offset(tz_offset: int, expected_offset, expected_name):
+    tz = parse_timezone(tz_offset)
+    assert hasattr(tz, "offset")
+    assert tz.offset == expected_offset
+    assert tz.name == expected_name
+    assert parse_timezone(tz_offset) is tz