You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/03 10:23:42 UTC

[GitHub] [airflow] mai-nakagawa commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

mai-nakagawa commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r936463509


##########
airflow/timetables/trigger.py:
##########
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables._cron import CronMixin
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
+
+
+class CronTriggerTimetable(CronMixin, Timetable):
+    """Timetable that triggers runs according to a cron expression.

Review Comment:
   ```suggestion
       """Timetable that triggers DAG runs according to a cron expression.
   ```



##########
airflow/timetables/_cron.py:
##########
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
+from croniter import CroniterBadCronError, CroniterBadDateError, croniter
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowTimetableInvalid
+from airflow.timetables.base import Timetable
+from airflow.utils.dates import cron_presets
+from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
+
+
+def _is_schedule_fixed(expression: str) -> bool:
+    """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
+
+    :return: True if the schedule has a fixed time, False if not.
+
+    Detection is done by "peeking" the next two cron trigger time; if the
+    two times have the same minute and hour value, the schedule is fixed,
+    and we *don't* need to perform the DST fix.
+
+    This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
+    """
+    cron = croniter(expression)
+    next_a = cron.get_next(datetime.datetime)
+    next_b = cron.get_next(datetime.datetime)
+    return next_b.minute == next_a.minute and next_b.hour == next_a.hour
+
+
+class CronMixin:
+    """Mixin to provide interface to work with croniter."""
+
+    def __init__(self, cron: str, timezone: str | Timezone) -> None:
+        self._expression = cron_presets.get(cron, cron)
+
+        if isinstance(timezone, str):
+            timezone = Timezone(timezone)
+        self._timezone = timezone
+
+        descriptor = ExpressionDescriptor(
+            expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True
+        )
+        try:
+            # checking for more than 5 parameters in Cron and avoiding evaluation for now,
+            # as Croniter has inconsistent evaluation with other libraries
+            if len(croniter(self._expression).expanded) > 5:
+                raise FormatException()
+            interval_description = descriptor.get_description()
+        except (CroniterBadCronError, FormatException, MissingFieldException):
+            interval_description = ""
+        self.description = interval_description
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import decode_timezone
+
+        # We ignore typing on the next line because mypy expects it to return CronMixin type.
+        # However, this should return Timetable since it should only be called against a timetable subclass
+        return cls(data["expression"], decode_timezone(data["timezone"]))  # type: ignore
+
+    def __eq__(self, other: Any) -> bool:
+        """Both expression and timezone should match.
+
+        This is only for testing purposes and should not be relied on otherwise.
+        """
+        if not isinstance(other, type(self)):
+            return NotImplemented
+        return self._expression == other._expression and self._timezone == other._timezone
+
+    @property
+    def summary(self) -> str:
+        return self._expression
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_timezone
+
+        return {"expression": self._expression, "timezone": encode_timezone(self._timezone)}

Review Comment:
   Should we handle the new arg `interval` here (and in `deserialize` method) to make it (de)serializable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org