You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/03 09:06:48 UTC

[GitHub] [airflow] uranusjr opened a new pull request, #25503: Add optional data interval to CronTriggerTimetable

uranusjr opened a new pull request, #25503:
URL: https://github.com/apache/airflow/pull/25503

   Close 
   
   This allows the user to provide a (static) data interval for DAG runs to adopt. This should be useful for most cron usages and more natural to the old schedule_interval logic. If `interval` is not provided, the default behaviour is the same as implemented in #23662.
   
   Also moved CronTriggerTimetable to its own module. The class living in `interval` is weird.
   
   cc @mai-nakagawa


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mai-nakagawa commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
mai-nakagawa commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r937401626


##########
airflow/timetables/trigger.py:
##########
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables._cron import CronMixin
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
+
+
+class CronTriggerTimetable(CronMixin, Timetable):
+    """Timetable that triggers DAG runs according to a cron expression.
+
+    This is different from ``CronDataIntervalTimetable``, where the cron
+    expression specifies the *data interval* of a DAG run. With this timetable,
+    the data intervals are specified independently from the cron expression.
+    Also for the same reason, this timetable kicks off a DAG run immediately at
+    the start of the period (similar to POSIX cron), instead of needing to wait
+    for one data interval to pass.
+
+    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+    """
+
+    def __init__(
+        self,
+        cron: str,
+        *,
+        timezone: str | Timezone,
+        interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+    ) -> None:
+        super().__init__(cron, timezone)
+        self._interval = interval
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone
+
+        interval: datetime.timedelta | relativedelta
+        if isinstance(data["interval"], dict):
+            interval = decode_relativedelta(data["interval"])
+        else:
+            interval = datetime.timedelta(seconds=data["interval"])
+        return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_relativedelta
+
+        interval: float | dict[str, Any]
+        if isinstance(self._interval, datetime.timedelta):
+            interval = self._interval.total_seconds()
+        else:
+            interval = encode_relativedelta(self._interval)
+        return {"expression": self._expression, "timezone": self._timezone, "interval": interval}

Review Comment:
   It means you may resolve it 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mai-nakagawa commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
mai-nakagawa commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r937401084


##########
airflow/timetables/trigger.py:
##########
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables._cron import CronMixin
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
+
+
+class CronTriggerTimetable(CronMixin, Timetable):
+    """Timetable that triggers DAG runs according to a cron expression.
+
+    This is different from ``CronDataIntervalTimetable``, where the cron
+    expression specifies the *data interval* of a DAG run. With this timetable,
+    the data intervals are specified independently from the cron expression.
+    Also for the same reason, this timetable kicks off a DAG run immediately at
+    the start of the period (similar to POSIX cron), instead of needing to wait
+    for one data interval to pass.
+
+    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+    """
+
+    def __init__(
+        self,
+        cron: str,
+        *,
+        timezone: str | Timezone,
+        interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+    ) -> None:
+        super().__init__(cron, timezone)
+        self._interval = interval
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone
+
+        interval: datetime.timedelta | relativedelta
+        if isinstance(data["interval"], dict):
+            interval = decode_relativedelta(data["interval"])
+        else:
+            interval = datetime.timedelta(seconds=data["interval"])
+        return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_relativedelta
+
+        interval: float | dict[str, Any]
+        if isinstance(self._interval, datetime.timedelta):
+            interval = self._interval.total_seconds()
+        else:
+            interval = encode_relativedelta(self._interval)
+        return {"expression": self._expression, "timezone": self._timezone, "interval": interval}

Review Comment:
   Thanks. I keep this thread unresolved just because I don't have a perm to resolve it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
uranusjr commented on PR #25503:
URL: https://github.com/apache/airflow/pull/25503#issuecomment-1204960264

   I prefer to do fixup commits for fixes that are not standalone to keep the history clean. I admit GitHub does not provide a very good UI for them, but the changes are still visible if you click on “force-pushed” in the message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr merged pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
uranusjr merged PR #25503:
URL: https://github.com/apache/airflow/pull/25503


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mai-nakagawa commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
mai-nakagawa commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r937352145


##########
airflow/timetables/trigger.py:
##########
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables._cron import CronMixin
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
+
+
+class CronTriggerTimetable(CronMixin, Timetable):
+    """Timetable that triggers DAG runs according to a cron expression.
+
+    This is different from ``CronDataIntervalTimetable``, where the cron
+    expression specifies the *data interval* of a DAG run. With this timetable,
+    the data intervals are specified independently from the cron expression.
+    Also for the same reason, this timetable kicks off a DAG run immediately at
+    the start of the period (similar to POSIX cron), instead of needing to wait
+    for one data interval to pass.
+
+    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+    """
+
+    def __init__(
+        self,
+        cron: str,
+        *,
+        timezone: str | Timezone,
+        interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+    ) -> None:
+        super().__init__(cron, timezone)
+        self._interval = interval
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone
+
+        interval: datetime.timedelta | relativedelta
+        if isinstance(data["interval"], dict):
+            interval = decode_relativedelta(data["interval"])
+        else:
+            interval = datetime.timedelta(seconds=data["interval"])
+        return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_relativedelta
+
+        interval: float | dict[str, Any]
+        if isinstance(self._interval, datetime.timedelta):
+            interval = self._interval.total_seconds()
+        else:
+            interval = encode_relativedelta(self._interval)
+        return {"expression": self._expression, "timezone": self._timezone, "interval": interval}

Review Comment:
   I think we should use `encode_timezone(self._timezone)`. Maybe it's safer to implement a test case or two to make sure the class is serializable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mai-nakagawa commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
mai-nakagawa commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r936463509


##########
airflow/timetables/trigger.py:
##########
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables._cron import CronMixin
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
+
+
+class CronTriggerTimetable(CronMixin, Timetable):
+    """Timetable that triggers runs according to a cron expression.

Review Comment:
   ```suggestion
       """Timetable that triggers DAG runs according to a cron expression.
   ```



##########
airflow/timetables/_cron.py:
##########
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
+from croniter import CroniterBadCronError, CroniterBadDateError, croniter
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowTimetableInvalid
+from airflow.timetables.base import Timetable
+from airflow.utils.dates import cron_presets
+from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
+
+
+def _is_schedule_fixed(expression: str) -> bool:
+    """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
+
+    :return: True if the schedule has a fixed time, False if not.
+
+    Detection is done by "peeking" the next two cron trigger time; if the
+    two times have the same minute and hour value, the schedule is fixed,
+    and we *don't* need to perform the DST fix.
+
+    This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
+    """
+    cron = croniter(expression)
+    next_a = cron.get_next(datetime.datetime)
+    next_b = cron.get_next(datetime.datetime)
+    return next_b.minute == next_a.minute and next_b.hour == next_a.hour
+
+
+class CronMixin:
+    """Mixin to provide interface to work with croniter."""
+
+    def __init__(self, cron: str, timezone: str | Timezone) -> None:
+        self._expression = cron_presets.get(cron, cron)
+
+        if isinstance(timezone, str):
+            timezone = Timezone(timezone)
+        self._timezone = timezone
+
+        descriptor = ExpressionDescriptor(
+            expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True
+        )
+        try:
+            # checking for more than 5 parameters in Cron and avoiding evaluation for now,
+            # as Croniter has inconsistent evaluation with other libraries
+            if len(croniter(self._expression).expanded) > 5:
+                raise FormatException()
+            interval_description = descriptor.get_description()
+        except (CroniterBadCronError, FormatException, MissingFieldException):
+            interval_description = ""
+        self.description = interval_description
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import decode_timezone
+
+        # We ignore typing on the next line because mypy expects it to return CronMixin type.
+        # However, this should return Timetable since it should only be called against a timetable subclass
+        return cls(data["expression"], decode_timezone(data["timezone"]))  # type: ignore
+
+    def __eq__(self, other: Any) -> bool:
+        """Both expression and timezone should match.
+
+        This is only for testing purposes and should not be relied on otherwise.
+        """
+        if not isinstance(other, type(self)):
+            return NotImplemented
+        return self._expression == other._expression and self._timezone == other._timezone
+
+    @property
+    def summary(self) -> str:
+        return self._expression
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_timezone
+
+        return {"expression": self._expression, "timezone": encode_timezone(self._timezone)}

Review Comment:
   Should we handle the new arg `interval` here (and in `deserialize` method) to make it (de)serializable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r936497475


##########
airflow/timetables/_cron.py:
##########
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, FormatException, MissingFieldException
+from croniter import CroniterBadCronError, CroniterBadDateError, croniter
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.compat.functools import cached_property
+from airflow.exceptions import AirflowTimetableInvalid
+from airflow.timetables.base import Timetable
+from airflow.utils.dates import cron_presets
+from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
+
+
+def _is_schedule_fixed(expression: str) -> bool:
+    """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
+
+    :return: True if the schedule has a fixed time, False if not.
+
+    Detection is done by "peeking" the next two cron trigger time; if the
+    two times have the same minute and hour value, the schedule is fixed,
+    and we *don't* need to perform the DST fix.
+
+    This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
+    """
+    cron = croniter(expression)
+    next_a = cron.get_next(datetime.datetime)
+    next_b = cron.get_next(datetime.datetime)
+    return next_b.minute == next_a.minute and next_b.hour == next_a.hour
+
+
+class CronMixin:
+    """Mixin to provide interface to work with croniter."""
+
+    def __init__(self, cron: str, timezone: str | Timezone) -> None:
+        self._expression = cron_presets.get(cron, cron)
+
+        if isinstance(timezone, str):
+            timezone = Timezone(timezone)
+        self._timezone = timezone
+
+        descriptor = ExpressionDescriptor(
+            expression=self._expression, casing_type=CasingTypeEnum.Sentence, use_24hour_time_format=True
+        )
+        try:
+            # checking for more than 5 parameters in Cron and avoiding evaluation for now,
+            # as Croniter has inconsistent evaluation with other libraries
+            if len(croniter(self._expression).expanded) > 5:
+                raise FormatException()
+            interval_description = descriptor.get_description()
+        except (CroniterBadCronError, FormatException, MissingFieldException):
+            interval_description = ""
+        self.description = interval_description
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import decode_timezone
+
+        # We ignore typing on the next line because mypy expects it to return CronMixin type.
+        # However, this should return Timetable since it should only be called against a timetable subclass
+        return cls(data["expression"], decode_timezone(data["timezone"]))  # type: ignore
+
+    def __eq__(self, other: Any) -> bool:
+        """Both expression and timezone should match.
+
+        This is only for testing purposes and should not be relied on otherwise.
+        """
+        if not isinstance(other, type(self)):
+            return NotImplemented
+        return self._expression == other._expression and self._timezone == other._timezone
+
+    @property
+    def summary(self) -> str:
+        return self._expression
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_timezone
+
+        return {"expression": self._expression, "timezone": encode_timezone(self._timezone)}

Review Comment:
   Ah I missed this, thanks for catching it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r937959740


##########
docs/apache-airflow/concepts/timetable.rst:
##########
@@ -60,21 +60,44 @@ may be available in plugins.
 CronTriggerTimetable
 ^^^^^^^^^^^^^^^^^^^^
 
-A timetable which accepts a cron expression.
+A timetable that accepts a cron expression, and triggers DAG runs according to it.
 
-Note `CronDataIntervalTimetable`_ also accepts a cron expression. See `Differences between the two cron timetables`_.
+.. seealso:: `Differences between the two cron timetables`_
 
 .. code-block:: python
 
-    from airflow.timetables.interval import CronTriggerTimetable

Review Comment:
   Not yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r937374148


##########
airflow/timetables/trigger.py:
##########
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from __future__ import annotations
+
+import datetime
+from typing import Any
+
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables._cron import CronMixin
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
+
+
+class CronTriggerTimetable(CronMixin, Timetable):
+    """Timetable that triggers DAG runs according to a cron expression.
+
+    This is different from ``CronDataIntervalTimetable``, where the cron
+    expression specifies the *data interval* of a DAG run. With this timetable,
+    the data intervals are specified independently from the cron expression.
+    Also for the same reason, this timetable kicks off a DAG run immediately at
+    the start of the period (similar to POSIX cron), instead of needing to wait
+    for one data interval to pass.
+
+    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+    """
+
+    def __init__(
+        self,
+        cron: str,
+        *,
+        timezone: str | Timezone,
+        interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+    ) -> None:
+        super().__init__(cron, timezone)
+        self._interval = interval
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import decode_relativedelta, decode_timezone
+
+        interval: datetime.timedelta | relativedelta
+        if isinstance(data["interval"], dict):
+            interval = decode_relativedelta(data["interval"])
+        else:
+            interval = datetime.timedelta(seconds=data["interval"])
+        return cls(data["expression"], timezone=decode_timezone(data["timezone"]), interval=interval)
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_relativedelta
+
+        interval: float | dict[str, Any]
+        if isinstance(self._interval, datetime.timedelta):
+            interval = self._interval.total_seconds()
+        else:
+            interval = encode_relativedelta(self._interval)
+        return {"expression": self._expression, "timezone": self._timezone, "interval": interval}

Review Comment:
   Good idea, added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mai-nakagawa commented on pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
mai-nakagawa commented on PR #25503:
URL: https://github.com/apache/airflow/pull/25503#issuecomment-1205897480

   >  the changes are still visible if you click on “force-pushed” in the message.
   
   Oh, I missed `compare` button on "force-pushed" messages. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25503: Add optional data interval to CronTriggerTimetable

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25503:
URL: https://github.com/apache/airflow/pull/25503#discussion_r937641478


##########
docs/apache-airflow/concepts/timetable.rst:
##########
@@ -60,21 +60,44 @@ may be available in plugins.
 CronTriggerTimetable
 ^^^^^^^^^^^^^^^^^^^^
 
-A timetable which accepts a cron expression.
+A timetable that accepts a cron expression, and triggers DAG runs according to it.
 
-Note `CronDataIntervalTimetable`_ also accepts a cron expression. See `Differences between the two cron timetables`_.
+.. seealso:: `Differences between the two cron timetables`_
 
 .. code-block:: python
 
-    from airflow.timetables.interval import CronTriggerTimetable

Review Comment:
   This hasn't been released already/it's a new class for 2.4 right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org