You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/16 02:47:27 UTC

[GitHub] [airflow] uranusjr opened a new pull request #15397: Initial work toward AIP-39

uranusjr opened a new pull request #15397:
URL: https://github.com/apache/airflow/pull/15397


   This creates a skeleton for the AIP-39 implementation to build on, and refactor the logic handling `@once` schedules out of the DAG class.
   
   The current implementation contains a lot of duplicated code, and the PR probably shouldn’t be merged until I can pick most of `next_dagrun_after_date` apart (and ultimately eliminate it entirely and rewrite all the tests against `DAG.next_dagrun_info()` or the various TimeTable classes instead).
   
   I want to post this WIP since the trivial `@once` implementation already exposes things missing from the interface outlined in AIP-39, and I had to invent an argument (`between`) to cover it. I want to know how ya’ll think of it 🙂 
   
   (Note: The current `TimeTable.next_dagrun_info()` interface is missing the session argument from AIP-39 because I don’t need it yet. It will be added when it need to be.)
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r635955157



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       `not` ... `cancel` is a bit hard for me to follow (akin to a double negative.)
   
   How about this as an alternative name?
   
   ```suggestion
           if not self.catchup:
               restriction = time_table.skip_to_latest(restriction)
   ```
   
   An additional thought: this should probably be handled _inside_ `time_table.next_dagrun_info`, so how about we change the signature of that to:
   
   
   ```python
       def next_dagrun_info(
           self,
           last_automated_dagrun: Optional[DateTime],
           between: TimeRestriction,
           catchup: boolean,
       ) -> Optional[DagRunInfo]:
   ```
   
   I think by passing in the catchup and letting the Timetable handle it that also means we don't have to have `cancel_catchup` as part of the protocol.

##########
File path: airflow/models/dag.py
##########
@@ -44,6 +44,7 @@
     cast,
 )
 
+import cached_property

Review comment:
       We've been using functools.cache_property where available (py 3.8+).  (We should probably put that in a typing_compat or similar helper module so we can not have the try/except in every file that needs it)

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       Is the type needed here? The property already specifies the type 🤔 

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:

Review comment:
       `_format` to me implies a string return. How about
   
   ```suggestion
       def _get_time_restriction(self) -> TimeRestriction:
   ```

##########
File path: airflow/timetables/base.py
##########
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import NamedTuple, Optional
+
+from pendulum import DateTime
+
+from airflow.typing_compat import Protocol
+
+
+class DataInterval(NamedTuple):
+    """A data interval for a DagRun to operate over.
+
+    The represented interval is ``[start, end)``.
+    """
+
+    start: DateTime
+    end: DateTime
+
+
+class TimeRestriction(NamedTuple):
+    """A period to restrict a datetime between two values.
+
+    This is used to bound the next DagRun schedule to a time period. If the
+    scheduled time is earlier than ``earliest``, it is set to ``earliest``. If
+    the time is later than ``latest``, the DagRun is not scheduled.
+
+    Both values are inclusive; a DagRun can happen exactly at either
+    ``earliest`` or ``latest``.
+    """
+
+    earliest: Optional[DateTime]
+    latest: Optional[DateTime]
+
+
+class DagRunInfo(NamedTuple):
+    """Information to schedule a DagRun.
+
+    Instances of this will be returned by TimeTables when they are asked to
+    schedule a DagRun creation.
+    """
+
+    run_after: DateTime
+    """The earliest time this DagRun is created and its tasks scheduled."""
+
+    data_interval: DataInterval
+    """The data interval this DagRun to operate over, if applicable."""
+
+    @classmethod
+    def exact(cls, at: DateTime) -> "DagRunInfo":
+        """Represent a run on an exact time."""
+        return cls(run_after=at, data_interval=DataInterval(at, at))
+
+    @classmethod
+    def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
+        """Represent a run on a continuous schedule.
+
+        In such a schedule, each data interval starts right after the previous
+        one ends, and each run is scheduled right after the interval ends. This
+        applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
+        """
+        return cls(run_after=end, data_interval=DataInterval(start, end))
+
+
+class TimeTable(Protocol):
+    """Protocol that all TimeTable classes are expected to implement."""
+
+    def cancel_catchup(self, between: TimeRestriction) -> TimeRestriction:
+        """Fix time restriction to not perform catchup."""
+        raise NotImplementedError()
+
+    def next_dagrun_info(
+        self,
+        last_automated_dagrun: Optional[DateTime],
+        between: TimeRestriction,
+    ) -> Optional[DagRunInfo]:
+        """Provide information to schedule the next DagRun.
+
+        :param last_automated_dagrun: The execution_date of the associated DAG's
+            last scheduled or backfilled run (manual runs not considered).
+        :param later_than: The next DagRun must be scheduled later than this
+            time. This is generally the earliest of ``DAG.start_date`` and each
+            ``BaseOperator.start_date`` in the DAG. None means the next DagRun
+            can happen anytime.
+
+        :return: Information on when the next DagRun can be scheduled. None
+            means a DagRun will not happen. This does not mean no more runs
+            will be scheduled even again for this DAG; the time table can
+            return a DagRunInfo when asked later.

Review comment:
       ```suggestion
               return a DagRunInfo object when asked at another time.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644651378



##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:

Review comment:
       Since we don’t really care about the previous schedule, we can still use `DagRun.get_previous_scheduled_dagrun()` here, and use its execute date (or something) instead of `DAG.previous_schedule()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869379946


   I’m going to pull the trigger on this soon-ish if the checks pass; this one modifies way too many files and is causing merge conflicts left and right.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r621908439



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       It means something along the lines of “I hate this but there’s no better way to do it now without digging deep into the rabbit hole and expanding the scope of this PR beyond help, any help is welcomed”. I see other projects use markers like `TODO` or `FIXME` for this as well.
   
   https://www.snellman.net/blog/archive/2017-04-17-xxx-fixme/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r619605210



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       What does the `XXX` means?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r636099219



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       I think python/mypy#10408 is the one. Let me try the generic annotation method after the `compat` package is in. I think something like this should work?
   
   ```python
   if TYPE_CHECKING:
       T = TypeVar("T")
   
       def cached_property(f: Callable[..., T]) -> T:
           ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r636078330



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       https://github.com/python/mypy/pull/10408 might help when it's released, or something along the lines of https://github.com/python/mypy/issues/5858




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15397: AIP-39: Handle DAG scheduling with timetables

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-870373357


   Woohoo!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869774220


   Man it’s so difficult to see all checks pass together. I think I’ve seen each of them passed at least once during my multiple attempts though, so this is going in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r659490781



##########
File path: airflow/exceptions.py
##########
@@ -115,6 +115,17 @@ class AirflowClusterPolicyViolation(AirflowException):
     """Raise when there is a violation of a Cluster Policy in Dag definition"""
 
 
+class AirflowTimetableInvalid(AirflowException):

Review comment:
       `AirflowTimetableInvalid` is more in line with other exceptions like `DagNotFound`, `DuplicateTaskIdFound` etc. (Naming of existing exceptions is already a bit inconsistent so I understand why you feel this way though.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r651685964



##########
File path: airflow/timetables/base.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Iterator, NamedTuple, Optional
+
+from pendulum import DateTime
+
+from airflow.typing_compat import Protocol
+
+
+class DataInterval(NamedTuple):
+    """A data interval for a DagRun to operate over.
+
+    The represented interval is ``[start, end)``.
+    """
+
+    start: DateTime
+    end: DateTime
+
+
+class TimeRestriction(NamedTuple):
+    """Restriction on when a DAG can be scheduled for a run.
+
+    Specifically, the run must not be earlier than ``earliest``, nor later than
+    ``latest``. If ``catchup`` is *False*, the run must also not be earlier than
+    the current time, i.e. "missed" schedules are not backfilled.
+
+    These values are generally set on the DAG or task's ``start_date``,
+    ``end_date``, and ``catchup`` arguments.
+
+    Both ``earliest`` and ``latest`` are inclusive; a DAG run can happen exactly
+    at either point of time.
+    """
+
+    earliest: Optional[DateTime]
+    latest: Optional[DateTime]
+    catchup: bool
+
+
+class DagRunInfo(NamedTuple):
+    """Information to schedule a DagRun.
+
+    Instances of this will be returned by TimeTables when they are asked to
+    schedule a DagRun creation.
+    """
+
+    run_after: DateTime
+    """The earliest time this DagRun is created and its tasks scheduled."""
+
+    data_interval: DataInterval
+    """The data interval this DagRun to operate over, if applicable."""
+
+    @classmethod
+    def exact(cls, at: DateTime) -> "DagRunInfo":
+        """Represent a run on an exact time."""
+        return cls(run_after=at, data_interval=DataInterval(at, at))
+
+    @classmethod
+    def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
+        """Represent a run on a continuous schedule.
+
+        In such a schedule, each data interval starts right after the previous
+        one ends, and each run is scheduled right after the interval ends. This
+        applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
+        """
+        return cls(run_after=end, data_interval=DataInterval(start, end))
+
+
+class TimeTable(Protocol):

Review comment:
       Ah cool. LGTM then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869774220


   Man it’s so difficult to see all tests pass together. I think I’ve seen each of them passed at least once during my multiple attempts though, so this is going in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644663968



##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       This was powering the "dag runs" drop down on the graph view etc.
   
   ![image](https://user-images.githubusercontent.com/34150/120627600-bd2f6500-c45b-11eb-9399-c1d8adb15b49.png)
   
   So this change will break the values in the drop down (and yes, it's not covered by tests. UI tests are hard/bad)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644601486



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:

Review comment:
       I turned this into a cached property instead since the variable is also needed in `DAG.get_run_dates()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644668430



##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       I think this needs to be something like:
   
   ```
   SELECT execution_date FROM dag_run WHERE execution_date <= $base_date ORDER BY execution_date DESC OFFSET ? LIMIT 1
   ```
   
   with `abs(num_runs)` as the offset.
   
   I _think_




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644601486



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:

Review comment:
       I turned this into a cached property instead since the variable is also needed in `DAG.get_run_dates()`.

##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:

Review comment:
       Since we don’t really care about the previous schedule, we can still use `DagRun.get_previous_scheduled_dagrun()` here, and use its execute date (or something) instead of `DAG.previous_schedule()`.

##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       1. Don’t do `abs()` in `iter_next_n()`, raise a `ValueError`.
   2. Change the places that are relying on `iter_next_n()` to return previous runs to get the actual `DagRun`s instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r616988694



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +551,44 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        next_info = self._create_time_table().next_dagrun_info(
+            coerce_datetime(date_last_automated_dagrun),
+            self._format_time_restriction(),
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:
+        start_dates = [t.start_date for t in self.tasks if t.start_date]
+        if self.start_date is not None:
+            start_dates.append(self.start_date)
+        if start_dates:
+            restriction_earliest = coerce_datetime(min(start_dates))
         else:
-            next_run_date = self.following_schedule(date_last_automated_dagrun)
-
-        if date_last_automated_dagrun and next_run_date:
-            while next_run_date <= date_last_automated_dagrun:
-                next_run_date = self.following_schedule(next_run_date)
-
-        # don't ever schedule prior to the dag's start_date
-        if self.start_date:
-            next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
-            if next_run_date == self.start_date:
-                next_run_date = self.normalize_schedule(self.start_date)
-
-            self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)
-
-        # Don't schedule a dag beyond its end_date (as specified by the dag param)
-        if next_run_date and self.end_date and next_run_date > self.end_date:
-            return None
-
-        # Don't schedule a dag beyond its end_date (as specified by the task params)
-        # Get the min task end date, which may come from the dag.default_args
-        task_end_dates = [t.end_date for t in self.tasks if t.end_date]
-        if task_end_dates and next_run_date:
-            min_task_end_date = min(task_end_dates)
-            if next_run_date > min_task_end_date:
-                return None
-
-        return next_run_date
+            restriction_earliest = None
+        end_dates = [t.end_date for t in self.tasks if t.end_date]
+        if self.end_date is not None:
+            end_dates.append(self.end_date)
+        if end_dates:
+            restriction_latest = coerce_datetime(max(end_dates))
+        else:
+            restriction_latest = None
+        return TimeRestriction(restriction_earliest, restriction_latest)
+
+    def _create_time_table(self) -> TimeTableProtocol:

Review comment:
       ```suggestion
       @cached_property
       def time_table(self) -> TimeTableProtocol:
   ```
   
   That way when we change to supporting setting time-tables directly then less code needs to change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r635957533



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       Mypy cannot detect the type for `cached_property` for some reason. Not sure if it's because we're not 3.8+ or some other reasons, but this becomes `Any` if I don't declare the type.

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       Mypy cannot detect the type for `cached_property` for some reason. Not sure if it's because we're not 3.8+ or what, but this becomes `Any` if I don't declare the type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r621893844



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:
+        start_dates = [t.start_date for t in self.tasks if t.start_date]
+        if self.start_date is not None:
+            start_dates.append(self.start_date)
+        if start_dates:
+            restriction_earliest = timezone.coerce_datetime(min(start_dates))
         else:
-            next_run_date = self.following_schedule(date_last_automated_dagrun)
-
-        if date_last_automated_dagrun and next_run_date:
-            while next_run_date <= date_last_automated_dagrun:
-                next_run_date = self.following_schedule(next_run_date)
-
-        # don't ever schedule prior to the dag's start_date
-        if self.start_date:
-            next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
-            if next_run_date == self.start_date:
-                next_run_date = self.normalize_schedule(self.start_date)
-
-            self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)
-
-        # Don't schedule a dag beyond its end_date (as specified by the dag param)
-        if next_run_date and self.end_date and next_run_date > self.end_date:
-            return None
-
-        # Don't schedule a dag beyond its end_date (as specified by the task params)
-        # Get the min task end date, which may come from the dag.default_args
-        task_end_dates = [t.end_date for t in self.tasks if t.end_date]
-        if task_end_dates and next_run_date:
-            min_task_end_date = min(task_end_dates)
-            if next_run_date > min_task_end_date:
-                return None
-
-        return next_run_date
+            restriction_earliest = None
+        end_dates = [t.end_date for t in self.tasks if t.end_date]
+        if self.end_date is not None:
+            end_dates.append(self.end_date)
+        if end_dates:
+            restriction_latest = timezone.coerce_datetime(max(end_dates))
+        else:
+            restriction_latest = None
+        return TimeRestriction(restriction_earliest, restriction_latest)
+
+    @cached_property.cached_property
+    def time_table(self) -> TimeTable:
+        interval = self.schedule_interval
+        if interval is None:
+            return NullTimeTable()
+        if interval == "@once":
+            return OnceTimeTable()
+        if not isinstance(interval, str):
+            assert isinstance(interval, (timedelta, relativedelta))

Review comment:
       I… have opinions on the decision. Assertions are not the same as if-raise, and both have different use cases. But this is not the place to argue over it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-861418163


   I’ve also changed the `TimeTable` class names to `Timetable` for consistency (so it’s now `CronTimetable` instead of `CronTimeTable` etc.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823763078


   [The Workflow run](https://github.com/apache/airflow/actions/runs/769404846) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823551898


   Is your PR description (or the bit about "should not be merged") still accurate?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r651657085



##########
File path: airflow/timetables/base.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Iterator, NamedTuple, Optional
+
+from pendulum import DateTime
+
+from airflow.typing_compat import Protocol
+
+
+class DataInterval(NamedTuple):
+    """A data interval for a DagRun to operate over.
+
+    The represented interval is ``[start, end)``.
+    """
+
+    start: DateTime
+    end: DateTime
+
+
+class TimeRestriction(NamedTuple):
+    """Restriction on when a DAG can be scheduled for a run.
+
+    Specifically, the run must not be earlier than ``earliest``, nor later than
+    ``latest``. If ``catchup`` is *False*, the run must also not be earlier than
+    the current time, i.e. "missed" schedules are not backfilled.
+
+    These values are generally set on the DAG or task's ``start_date``,
+    ``end_date``, and ``catchup`` arguments.
+
+    Both ``earliest`` and ``latest`` are inclusive; a DAG run can happen exactly
+    at either point of time.
+    """
+
+    earliest: Optional[DateTime]
+    latest: Optional[DateTime]
+    catchup: bool
+
+
+class DagRunInfo(NamedTuple):
+    """Information to schedule a DagRun.
+
+    Instances of this will be returned by TimeTables when they are asked to
+    schedule a DagRun creation.
+    """
+
+    run_after: DateTime
+    """The earliest time this DagRun is created and its tasks scheduled."""
+
+    data_interval: DataInterval
+    """The data interval this DagRun to operate over, if applicable."""
+
+    @classmethod
+    def exact(cls, at: DateTime) -> "DagRunInfo":
+        """Represent a run on an exact time."""
+        return cls(run_after=at, data_interval=DataInterval(at, at))
+
+    @classmethod
+    def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
+        """Represent a run on a continuous schedule.
+
+        In such a schedule, each data interval starts right after the previous
+        one ends, and each run is scheduled right after the interval ends. This
+        applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
+        """
+        return cls(run_after=end, data_interval=DataInterval(start, end))
+
+
+class TimeTable(Protocol):

Review comment:
       Protocol can have implementation as well (PEP 544 calls it default implementation), but adding the Protocol superclass makes Mypy complain if the protocol class is instantiated without subclassing. Another way is to make this an ABC like `BaseOperator` so instantiation fails at run-time (but that incurs more run-time penalty).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-850455267


   I’ve pushed some refactorings I made to migrate existing scheduling logic to be backed by TimeTables. I can split these into multiple parts if this is too much to review at once.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644664894



##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       1. Don’t do `abs()` in `iter_next_n()`, raise a `ValueError`.
   2. Change the places that are relying on `iter_next_n()` to return previous runs to get the actual `DagRun`s instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r636000496



##########
File path: airflow/models/dag.py
##########
@@ -44,6 +44,7 @@
     cast,
 )
 
+import cached_property

Review comment:
       I'll do that as a separate refactoring PR. This one isn't going to be merged immediately and needs a rebase anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869924890






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r651648816



##########
File path: airflow/timetables/base.py
##########
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Iterator, NamedTuple, Optional
+
+from pendulum import DateTime
+
+from airflow.typing_compat import Protocol
+
+
+class DataInterval(NamedTuple):
+    """A data interval for a DagRun to operate over.
+
+    The represented interval is ``[start, end)``.
+    """
+
+    start: DateTime
+    end: DateTime
+
+
+class TimeRestriction(NamedTuple):
+    """Restriction on when a DAG can be scheduled for a run.
+
+    Specifically, the run must not be earlier than ``earliest``, nor later than
+    ``latest``. If ``catchup`` is *False*, the run must also not be earlier than
+    the current time, i.e. "missed" schedules are not backfilled.
+
+    These values are generally set on the DAG or task's ``start_date``,
+    ``end_date``, and ``catchup`` arguments.
+
+    Both ``earliest`` and ``latest`` are inclusive; a DAG run can happen exactly
+    at either point of time.
+    """
+
+    earliest: Optional[DateTime]
+    latest: Optional[DateTime]
+    catchup: bool
+
+
+class DagRunInfo(NamedTuple):
+    """Information to schedule a DagRun.
+
+    Instances of this will be returned by TimeTables when they are asked to
+    schedule a DagRun creation.
+    """
+
+    run_after: DateTime
+    """The earliest time this DagRun is created and its tasks scheduled."""
+
+    data_interval: DataInterval
+    """The data interval this DagRun to operate over, if applicable."""
+
+    @classmethod
+    def exact(cls, at: DateTime) -> "DagRunInfo":
+        """Represent a run on an exact time."""
+        return cls(run_after=at, data_interval=DataInterval(at, at))
+
+    @classmethod
+    def interval(cls, start: DateTime, end: DateTime) -> "DagRunInfo":
+        """Represent a run on a continuous schedule.
+
+        In such a schedule, each data interval starts right after the previous
+        one ends, and each run is scheduled right after the interval ends. This
+        applies to all schedules prior to AIP-39 except ``@once`` and ``None``.
+        """
+        return cls(run_after=end, data_interval=DataInterval(start, end))
+
+
+class TimeTable(Protocol):

Review comment:
       Since this has an implementation (`iter_between`) should this just be a normal class?
   
   ```suggestion
   class TimeTable:
   ```

##########
File path: airflow/timetables/schedules.py
##########
@@ -0,0 +1,208 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import typing
+
+from cached_property import cached_property
+from croniter import CroniterBadCronError, CroniterBadDateError, croniter
+from dateutil.relativedelta import relativedelta
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.exceptions import AirflowTimeTableInvalid
+from airflow.typing_compat import Protocol
+from airflow.utils.dates import cron_presets
+from airflow.utils.timezone import convert_to_utc, make_aware, make_naive
+
+Delta = typing.Union[datetime.timedelta, relativedelta]
+
+
+class Schedule(Protocol):
+    """Base protocol for schedules."""
+
+    def skip_to_latest(self, earliest: typing.Optional[DateTime]) -> DateTime:
+        """Bound the earliest time a run can be scheduled.
+
+        This is called when ``catchup=False``. See docstring of subclasses for
+        exact skipping behaviour of a schedule.
+        """
+        raise NotImplementedError()
+
+    def validate(self) -> None:
+        """Validate the time table is correctly specified.

Review comment:
       ```suggestion
           """Validate the timetable is correctly specified.
   ```

##########
File path: airflow/timetables/interval.py
##########
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, Schedule
+
+
+class _DataIntervalTimeTable(TimeTable):
+    """Basis for time table implementations that schedule data intervals.
+
+    This kind of time tables create periodic data intervals from an underlying

Review comment:
       ```suggestion
       This kind of time table create periodic data intervals from an underlying
   ```

##########
File path: tests/test_utils/timetables.py
##########
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import settings
+from airflow.timetables.interval import CronDataIntervalTimeTable, DeltaDataIntervalTimeTable
+
+
+def cron_time_table(expr: str) -> CronDataIntervalTimeTable:
+    return CronDataIntervalTimeTable(expr, settings.TIMEZONE)
+
+
+def delta_time_table(delta) -> DeltaDataIntervalTimeTable:

Review comment:
       ```suggestion
   def delta_timetable(delta) -> DeltaDataIntervalTimeTable:
   ```

##########
File path: airflow/models/dag.py
##########
@@ -630,30 +567,37 @@ def get_run_dates(self, start_date, end_date=None):
         :type start_date: datetime
         :param end_date: the end date of the interval, defaults to timezone.utcnow()
         :type end_date: datetime
+        :param align: whether the first run should be delayed to "align" with
+            the schedule, or can happen immediately at start_date (default: True
+            for top-level dags, False for subdags)
+        :type align: bool

Review comment:
       This is _forced_ to false fo subdags -- do we need to set the default to be `align = None` maybe?

##########
File path: airflow/timetables/simple.py
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+
+
+class NullTimeTable(TimeTable):
+    """Time table that never schedules anything.

Review comment:
       ```suggestion
       """Timetable that never schedules anything.
   ```

##########
File path: airflow/models/dag.py
##########
@@ -532,96 +503,62 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        next_info = self.time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            self._time_restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
+        return (next_info.data_interval.start, next_info.run_after)
 
     def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        warnings.warn(
+            "`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        return self.next_dagrun_info(date_last_automated_dagrun)[0]
+
+    @cached_property
+    def _time_restriction(self) -> TimeRestriction:
+        start_dates = [t.start_date for t in self.tasks if t.start_date]
+        if self.start_date is not None:
+            start_dates.append(self.start_date)
+        if start_dates:
+            earliest = timezone.coerce_datetime(min(start_dates))
         else:
-            next_run_date = self.following_schedule(date_last_automated_dagrun)
-
-        if date_last_automated_dagrun and next_run_date:
-            while next_run_date <= date_last_automated_dagrun:
-                next_run_date = self.following_schedule(next_run_date)
-
-        # don't ever schedule prior to the dag's start_date
-        if self.start_date:
-            next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
-            if next_run_date == self.start_date:
-                next_run_date = self.normalize_schedule(self.start_date)
-
-            self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)
-
-        # Don't schedule a dag beyond its end_date (as specified by the dag param)
-        if next_run_date and self.end_date and next_run_date > self.end_date:
-            return None
-
-        # Don't schedule a dag beyond its end_date (as specified by the task params)
-        # Get the min task end date, which may come from the dag.default_args
-        task_end_dates = [t.end_date for t in self.tasks if t.end_date]
-        if task_end_dates and next_run_date:
-            min_task_end_date = min(task_end_dates)
-            if next_run_date > min_task_end_date:
-                return None
-
-        return next_run_date
-
-    def get_run_dates(self, start_date, end_date=None):
+            earliest = None
+        end_dates = [t.end_date for t in self.tasks if t.end_date]
+        if self.end_date is not None:
+            end_dates.append(self.end_date)
+        if end_dates:
+            latest = timezone.coerce_datetime(max(end_dates))
+        else:
+            latest = None
+        return TimeRestriction(earliest, latest, self.catchup)
+
+    @cached_property
+    def time_table(self) -> TimeTable:

Review comment:
       ```suggestion
       def timetable(self) -> TimeTable:
   ```
   
   Timetable is one word.
   
   (I may have missed a few instances of this in the PR)

##########
File path: airflow/timetables/simple.py
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+
+
+class NullTimeTable(TimeTable):
+    """Time table that never schedules anything.
+
+    This corresponds to ``schedule_interval=None``.
+    """
+
+    def __eq__(self, other: Any) -> bool:
+        """As long as *other* is of the same type."""
+        if not isinstance(other, NullTimeTable):
+            return NotImplemented
+        return True
+
+    def validate(self) -> None:
+        pass
+
+    def next_dagrun_info(
+        self,
+        last_automated_dagrun: Optional[DateTime],
+        restriction: TimeRestriction,
+    ) -> Optional[DagRunInfo]:
+        return None
+
+
+class OnceTimeTable(TimeTable):
+    """Time table that schedules the execution once as soon as possible.

Review comment:
       ```suggestion
       """Timetable that schedules the execution one time only and to run  as soon as possible.
   ```

##########
File path: tests/test_utils/timetables.py
##########
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow import settings
+from airflow.timetables.interval import CronDataIntervalTimeTable, DeltaDataIntervalTimeTable
+
+
+def cron_time_table(expr: str) -> CronDataIntervalTimeTable:

Review comment:
       ```suggestion
   def cron_timetable(expr: str) -> CronDataIntervalTimeTable:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r640789403



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       Only your idea would work then. Cool, I’ll change it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869774220


   Man it’s so difficult to see all checks pass together. I think I’ve seen each of them passed at least once during my multiple attempts though, so this is going in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on pull request #15397: AIP-39: Foundational TimeTable interface, OnceTimeTable implementation

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823056125


   @uranusjr Can you explain why the `between` argument is necessary? just from looking at the code I can't tell


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823758423


   Grr, not sure what’s going on the the Static Checks job, it just says “This check failed” for me and nothing else.
   
   The Postgres check failed with
   
   ```
   Cannot start service trino: driver failed programming external connectivity on endpoint trino (87de12f0f1304fe6056905791286b37b88245be569e5be81c8f993c29560fd28):
   Error starting userland proxy: listen tcp4 0.0.0.0:38080: bind: address already in use
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r641538951



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       I decided to put the catchup argument in `TimeRestriction` instead (since that’s a time restriction as well, basically move the earliest possible time to the current time).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r621908439



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       It means something along the lines of “I don’t like this but there’s no better way to do it now without digging deep into the rabbit hole and add a bunch of unrelated diffs to this PR, any help is welcomed”. I see other projects use markers like `TODO` or `FIXME` for this as well.
   
   https://www.snellman.net/blog/archive/2017-04-17-xxx-fixme/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r617133455



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +551,44 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        next_info = self._create_time_table().next_dagrun_info(
+            coerce_datetime(date_last_automated_dagrun),
+            self._format_time_restriction(),
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:
+        start_dates = [t.start_date for t in self.tasks if t.start_date]
+        if self.start_date is not None:
+            start_dates.append(self.start_date)
+        if start_dates:
+            restriction_earliest = coerce_datetime(min(start_dates))
         else:
-            next_run_date = self.following_schedule(date_last_automated_dagrun)
-
-        if date_last_automated_dagrun and next_run_date:
-            while next_run_date <= date_last_automated_dagrun:
-                next_run_date = self.following_schedule(next_run_date)
-
-        # don't ever schedule prior to the dag's start_date
-        if self.start_date:
-            next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
-            if next_run_date == self.start_date:
-                next_run_date = self.normalize_schedule(self.start_date)
-
-            self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)
-
-        # Don't schedule a dag beyond its end_date (as specified by the dag param)
-        if next_run_date and self.end_date and next_run_date > self.end_date:
-            return None
-
-        # Don't schedule a dag beyond its end_date (as specified by the task params)
-        # Get the min task end date, which may come from the dag.default_args
-        task_end_dates = [t.end_date for t in self.tasks if t.end_date]
-        if task_end_dates and next_run_date:
-            min_task_end_date = min(task_end_dates)
-            if next_run_date > min_task_end_date:
-                return None
-
-        return next_run_date
+            restriction_earliest = None
+        end_dates = [t.end_date for t in self.tasks if t.end_date]
+        if self.end_date is not None:
+            end_dates.append(self.end_date)
+        if end_dates:
+            restriction_latest = coerce_datetime(max(end_dates))
+        else:
+            restriction_latest = None
+        return TimeRestriction(restriction_earliest, restriction_latest)
+
+    def _create_time_table(self) -> TimeTableProtocol:

Review comment:
       Makes sense, I was going to do this during refactorings anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r621908439



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       It means something along the lines of “I don’t like this but there’s no better way to do it now without digging deep into the rabbit hole and add a bunch of unrelated diffs to this PR, this should be refactored away later”. I see other projects use markers like `TODO` or `FIXME` for this as well.
   
   https://www.snellman.net/blog/archive/2017-04-17-xxx-fixme/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644668430



##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       I think this needs to be something like:
   
   ```
   SELECT execution_date FROM dag_run WHERE .... ORDER BY execution_date DESC OFFSET ? LIMIT 1
   ```
   
   with `abs(num_runs)` as the offset.
   
   I _think_




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15397: AIP-39: Handle DAG scheduling with timetables

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-870366798


   :tada:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r651685788



##########
File path: airflow/timetables/interval.py
##########
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, Schedule
+
+
+class _DataIntervalTimeTable(TimeTable):
+    """Basis for time table implementations that schedule data intervals.
+
+    This kind of time tables create periodic data intervals from an underlying

Review comment:
       Yes please.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r640784403



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       Ah -- catch can  have an effect later on too.
   
   If I pause a dag with a daily schedule for 5 days, and then resume it:
   
   - With catchup: missing dag runs will be created
   - Without catchup: only the "latest" dag run will be created.

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       Ah -- catchup can  have an effect later on too.
   
   If I pause a dag with a daily schedule for 5 days, and then resume it:
   
   - With catchup: missing dag runs will be created
   - Without catchup: only the "latest" dag run will be created.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Foundational TimeTable interface, OnceTimeTable implementation

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823397885


   `betwewn: TimeRestriction` is a time interval that the next DAG run must be scheduled between, reflecting `start_date` and `end_date` of the DAG and tasks in it. A run cannot be scheduled before the latest `start_date` (must be postponed until then), nor after the earliest `end_date` (must be cancelled). This logic is implemented in `TimeRestriction.restrict()` to adjust the schedule “preferred” by the time table.
   
   I’ve actually removed `restrict()` and put the logic inside `next_dagrun_info()` instead (will push shortly) since the restriction logic needs to be slightly different depending on whether the time table is based on a data interval or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869924890


   > Man it’s so difficult to see all checks pass together. I think I’ve seen each of them passed at least once during my multiple attempts though, so this is going in.
   
   Should be now more stable with #16689 and #16682  merged. It's a constant struggle :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r619605210



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       What does the `XXX` means?

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:
+        start_dates = [t.start_date for t in self.tasks if t.start_date]
+        if self.start_date is not None:
+            start_dates.append(self.start_date)
+        if start_dates:
+            restriction_earliest = timezone.coerce_datetime(min(start_dates))
         else:
-            next_run_date = self.following_schedule(date_last_automated_dagrun)
-
-        if date_last_automated_dagrun and next_run_date:
-            while next_run_date <= date_last_automated_dagrun:
-                next_run_date = self.following_schedule(next_run_date)
-
-        # don't ever schedule prior to the dag's start_date
-        if self.start_date:
-            next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
-            if next_run_date == self.start_date:
-                next_run_date = self.normalize_schedule(self.start_date)
-
-            self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)
-
-        # Don't schedule a dag beyond its end_date (as specified by the dag param)
-        if next_run_date and self.end_date and next_run_date > self.end_date:
-            return None
-
-        # Don't schedule a dag beyond its end_date (as specified by the task params)
-        # Get the min task end date, which may come from the dag.default_args
-        task_end_dates = [t.end_date for t in self.tasks if t.end_date]
-        if task_end_dates and next_run_date:
-            min_task_end_date = min(task_end_dates)
-            if next_run_date > min_task_end_date:
-                return None
-
-        return next_run_date
+            restriction_earliest = None
+        end_dates = [t.end_date for t in self.tasks if t.end_date]
+        if self.end_date is not None:
+            end_dates.append(self.end_date)
+        if end_dates:
+            restriction_latest = timezone.coerce_datetime(max(end_dates))
+        else:
+            restriction_latest = None
+        return TimeRestriction(restriction_earliest, restriction_latest)
+
+    @cached_property.cached_property
+    def time_table(self) -> TimeTable:
+        interval = self.schedule_interval
+        if interval is None:
+            return NullTimeTable()
+        if interval == "@once":
+            return OnceTimeTable()
+        if not isinstance(interval, str):
+            assert isinstance(interval, (timedelta, relativedelta))

Review comment:
       We agreed to not use assert in production code. See:
   https://lists.apache.org/thread.html/08b64d3b084c865399f98f6c6f56235ce5329e843d97938e1a8045a5%40%3Cdev.airflow.apache.org%3E




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #15397: AIP-39: Handle DAG scheduling with timetables

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #15397:
URL: https://github.com/apache/airflow/pull/15397


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823698759


   > Is your PR description (or the bit about "should not be merged") still accurate?
   
   No it’s not, thanks for catching this. I’ve edited the top message to reflect the current status.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #15397: AIP-39: Foundational TimeTable interface, OnceTimeTable implementation

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823425749


   Alright, I’ve pushed my implementation to all the possible `schedule_interval` variants in use, and this should be ready to be merged on its own. The next step would be to gradually refactor things calling into the DAG class to use the time table API, instead of accessing `schedule_interval` and methods related to it.
   
   I’ve noticed a difficulty during my initial investigation though. There are various places using `DAG.previous_schedule()`, which calculates the DAG’s *previous* run schedule (logically; no guarantee the run actually happened). This is difficult to represent with the time table design, since the time table only schedules the next run (which can be used to replace `DAG.following_schedule()`, in case someone wonders).
   
   The simple solution would be to introduce something like `TimeTable.prev_dagrun_info()` to do it, but I’m wondering, is this really a good way to calculate the “previous run”? I’d imagine I could build something like a linked list of DAG runs, so I can just query when the last scheduled DAG run was from the database, instead of calculating a logical value that does not actually guarantee to reflect reality…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r617133073



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +112,21 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def coerce_datetime(v: Union[None, datetime, pendulum.DateTime]) -> Optional[pendulum.DateTime]:
+    """Convert whatever is passed in to ``pendulum.DateTime``.
+
+    This is for interfacing with the new ``timetables`` package, which
+    exclusively uses ``pendulum.DateTime`` internally.
+    """
+    if v is None:
+        return None
+    if isinstance(v, pendulum.DateTime):
+        return v
+    if v.tzinfo is None:
+        v = timezone.make_aware(v)
+    return pendulum.instance(v)

Review comment:
       My hope is actually to eventually remove this function altogether. If you looks at the annotation of `DAG.next_dagrun_info()`, the function has every intention to expect only `Optional[pendulum.DateTime]`, not `datetime.datetime`:
   
   https://github.com/apache/airflow/blob/b314c7168fbf8707c7b438b8f912ab5ed63b82ea/airflow/models/dag.py#L524-L527
   
   But someone somewhere passes in an incorrect value and breaks the promise. So that someone should be fixed instead.
   
   With that said, it’s probably not unreasonable to put the function in `airflow.utils.timezone` anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r639193579



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       The thing I don’t particular like about this is `catchup` does not make sense for any runs after the first, which to me indicates the design isn’t right. If we want to make catchup handling a part of the time table, a better interface to me would be to separate the first and subsequent DAG runs:
   
   ```python
   def first_dagrun_info(
       self,
       between: TimeRestriction,
       catchup: bool,
   ) -> Optional[DagRunInfo]:
       ...
   
   def next_dagrun_info(
       self,
       last_automated_dagrun: DateTime,
       between: TimeRestriction,
   ) -> Optional[DagRunInfo]:
       ...
   ```
   
   This especially makes sense to me because all current time table implementations all contain a clear logical distinction between the first and sebsequent DAG runs, also indicating mayne they should be different functions. The worry though is maybe in the future some time table may somehow have its non-first DAG run depend on the catchup configuration? But I have difficulties coming up with a sane example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #15397: AIP-39: Handle DAG scheduling with timetables

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #15397:
URL: https://github.com/apache/airflow/pull/15397


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r636099219



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       I think python/mypy#10408 is the one. Let me try the generic annotation method after the `compat` package is in. I'm thinking something like
   
   ```python
   if TYPE_CHECKING:
       T = TypeVar("T")
   
       def cached_property(f: Callable[..., T]) -> T:
           ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r651661495



##########
File path: airflow/models/dag.py
##########
@@ -630,30 +567,37 @@ def get_run_dates(self, start_date, end_date=None):
         :type start_date: datetime
         :param end_date: the end date of the interval, defaults to timezone.utcnow()
         :type end_date: datetime
+        :param align: whether the first run should be delayed to "align" with
+            the schedule, or can happen immediately at start_date (default: True
+            for top-level dags, False for subdags)
+        :type align: bool

Review comment:
       This can *never* be True for subdags, otherwise the scheduler would stuck. I’ll change the wording instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15397: AIP-39: Handle DAG scheduling with timetables

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-870431214


   🎉 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #15397: AIP-39: Handle DAG scheduling with timetables

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-870366798


   :tada:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644668914



##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       Possibly also adding in a "type != manual" too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Foundational TimeTable interface, OnceTimeTable implementation

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-823425749


   Alright, I’ve pushed my implementation to all the possible `schedule_interval` variants in use, and this should be ready to be merged on its own. The next step would be to gradually refactor things calling into the DAG class to use the time table API, instead of accessing `schedule_interval` and methods related to it.
   
   I’ve noticed a difficulty during my initial investigation though. There are various places using `DAG.previous_schedule()`, which calculates the DAG’s *previous* run schedule (logically; no guarantee the run actually happened). This is difficult to represent with the time table design, since the time table only schedules the next run (which can be used to replace `DAG.following_schedule()`, in case someone wonders).
   
   The simple solution would be to introduce something like `TimeTable.prev_dagrun_info()` to do it, but I’m wondering, is this really a good way to calculate the “previous run”? I’d imagine I could build something like a linked list of DAG runs, so I can just query when the last scheduled DAG run was from the database…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r636007210



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:

Review comment:
       How about `_create`? `_get` to me implies the object is ready (e.g. cached), not instantiated every time the function is called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644648007



##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:

Review comment:
       This conditional was checking something like this case:
   
   ```python
   with DAG(start_date='2020-01-01'):
       task1 = Operator()
       task2 = Operator(start_date='2021-07-01', depends_on_past=True)
   ```
   
   I guess this isn't covered by the tests either :(

##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+        dr = ti.get_dagrun(session=session)
+        if not dr:
+            last_dagrun = None
+        elif ti.task.dag.catchup:
+            last_dagrun = dr.get_previous_scheduled_dagrun(session)
         else:
-            dr = ti.get_dagrun(session=session)
-            last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
-
-            if not last_dagrun:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+            last_dagrun = dr.get_previous_dagrun(session=session)
+        if not last_dagrun:
+            yield self._passing_status(reason="This task instance was the first task instance for its task.")
+            return
 
         previous_ti = ti.get_previous_ti(session=session)

Review comment:
       This should be changed to get tis from `last_dagrun`, otherwise we are duplicating the effort to lookup the previous dag run twice.

##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+        dr = ti.get_dagrun(session=session)
+        if not dr:
+            last_dagrun = None
+        elif ti.task.dag.catchup:
+            last_dagrun = dr.get_previous_scheduled_dagrun(session)
         else:
-            dr = ti.get_dagrun(session=session)
-            last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
-
-            if not last_dagrun:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+            last_dagrun = dr.get_previous_dagrun(session=session)
+        if not last_dagrun:
+            yield self._passing_status(reason="This task instance was the first task instance for its task.")
+            return
 
         previous_ti = ti.get_previous_ti(session=session)

Review comment:
       ```suggestion
           previous_ti = last_dagrun.get_task_instance(ti.task_id, session=session)
   ```

##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       This was powering the "dag runs" drop down on the graph view etc.
   
   ![image](https://user-images.githubusercontent.com/34150/120627600-bd2f6500-c45b-11eb-9399-c1d8adb15b49.png)
   
   So this change will break the values in the drop down (and yes, it's not covered by tests. UI tests are hard/bad)

##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       Oh no, it's worse that that.
   
   This controls what runs are shown on the these views, where "base_date" is current and then we look back `num_runs` older runs.

##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       I think this needs to be something like:
   
   ```
   SELECT execution_date FROM dag_run WHERE .... ORDER BY execution_date DESC OFFSET ? LIMIT 1
   ```
   
   with `abs(num_runs)` as the offset.
   
   I _think_

##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       Possibly also adding in a "type != manual" too.

##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       I think this needs to be something like:
   
   ```
   SELECT execution_date FROM dag_run WHERE execution_date <= $base_date ORDER BY execution_date DESC OFFSET ? LIMIT 1
   ```
   
   with `abs(num_runs)` as the offset.
   
   I _think_




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r640792786



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       It also makes me wonder maybe I had implemented something wrong to not have realised this. I’ll need to take another look at my changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869774220


   Man it’s so difficult to see all tests pass together. I think I’ve seen each of them passed at least once during my multiple attempts though, so this is going in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644651986



##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+        dr = ti.get_dagrun(session=session)
+        if not dr:
+            last_dagrun = None
+        elif ti.task.dag.catchup:
+            last_dagrun = dr.get_previous_scheduled_dagrun(session)
         else:
-            dr = ti.get_dagrun(session=session)
-            last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
-
-            if not last_dagrun:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+            last_dagrun = dr.get_previous_dagrun(session=session)
+        if not last_dagrun:
+            yield self._passing_status(reason="This task instance was the first task instance for its task.")
+            return
 
         previous_ti = ti.get_previous_ti(session=session)

Review comment:
       ```suggestion
           previous_ti = last_dagrun.get_task_instance(ti.task_id, session=session)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r639185088



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       I put the above in a `.pyi` file instead. Seems to pleas Mypy enough (and avoids Pylint annoyingly complaining about type stubs not conforming to Python coding styles).

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       I put the above in a `.pyi` file instead. Seems to please Mypy enough (and avoids Pylint annoyingly complaining about type stubs not conforming to Python coding styles).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r639193579



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)

Review comment:
       The thing I don’t particular like about this is `catchup` does not make sense for any runs after the first, which to me indicates the design isn’t right. If we want to make catchup handling a part of the time table, a better interface to me would be to separate the first and subsequent DAG runs:
   
   ```python
   def first_dagrun_info(
       self,
       between: TimeRestriction,
       catchup: bool,
   ) -> Optional[DagRunInfo]:
       ...
   
   def next_dagrun_info(
       self,
       last_automated_dagrun: DateTime,
       between: TimeRestriction,
   ) -> Optional[DagRunInfo]:
       ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15397: AIP-39: Handle DAG scheduling with timetables

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-870431214


   🎉 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869924890


   > Man it’s so difficult to see all checks pass together. I think I’ve seen each of them passed at least once during my multiple attempts though, so this is going in.
   
   Should be now more stable with #16689 merged. It's a constant struggle :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644665052



##########
File path: airflow/www/views.py
##########
@@ -2298,8 +2298,10 @@ def duration(self, session=None):
         else:
             base_date = dag.get_latest_execution_date() or timezone.utcnow()
 
-        dates = dag.date_range(base_date, num=-abs(num_runs))
-        min_date = dates[0] if dates else timezone.utc_epoch()
+        min_date = next(
+            dag.time_table.iter_next_n(base_date, 1),
+            timezone.utc_epoch(),
+        )

Review comment:
       Oh no, it's worse that that.
   
   This controls what runs are shown on the these views, where "base_date" is current and then we look back `num_runs` older runs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r621908439



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       It means something along the lines of “I don’t like this but there’s no better way to do it now without digging deep into the rabbit hole and add a bunch of unrelated diffs to this PR, this should be refactored away”. I see other projects use markers like `TODO` or `FIXME` for this as well.
   
   https://www.snellman.net/blog/archive/2017-04-17-xxx-fixme/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r616983846



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +112,21 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def coerce_datetime(v: Union[None, datetime, pendulum.DateTime]) -> Optional[pendulum.DateTime]:
+    """Convert whatever is passed in to ``pendulum.DateTime``.
+
+    This is for interfacing with the new ``timetables`` package, which
+    exclusively uses ``pendulum.DateTime`` internally.
+    """
+    if v is None:
+        return None
+    if isinstance(v, pendulum.DateTime):
+        return v
+    if v.tzinfo is None:
+        v = timezone.make_aware(v)
+    return pendulum.instance(v)

Review comment:
       This should probably live in airflow.utils.timezone
   
   (As much as I hate `utils` as a module namespace, keeping things together seems better)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r635957533



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       Mypy cannot detect the type for `cached_property` for some reason. Not sure if it's because we're not 3.8+ or what, but this becomes a weird type (internal to `cached_property`, I think) if I don't declare the type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644651444



##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+        dr = ti.get_dagrun(session=session)
+        if not dr:
+            last_dagrun = None
+        elif ti.task.dag.catchup:
+            last_dagrun = dr.get_previous_scheduled_dagrun(session)
         else:
-            dr = ti.get_dagrun(session=session)
-            last_dagrun = dr.get_previous_dagrun(session=session) if dr else None
-
-            if not last_dagrun:
-                yield self._passing_status(
-                    reason="This task instance was the first task instance for its task."
-                )
-                return
+            last_dagrun = dr.get_previous_dagrun(session=session)
+        if not last_dagrun:
+            yield self._passing_status(reason="This task instance was the first task instance for its task.")
+            return
 
         previous_ti = ti.get_previous_ti(session=session)

Review comment:
       This should be changed to get tis from `last_dagrun`, otherwise we are duplicating the effort to lookup the previous dag run twice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#issuecomment-869924890


   > Man it’s so difficult to see all checks pass together. I think I’ve seen each of them passed at least once during my multiple attempts though, so this is going in.
   
   Should be now more stable with #16689 and #16682  merged. It's a constant struggle :)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r644648007



##########
File path: airflow/ti_deps/deps/prev_dagrun_dep.py
##########
@@ -44,25 +44,16 @@ def _get_dep_statuses(self, ti, session, dep_context):
             return
 
         # Don't depend on the previous task instance if we are the first task
-        dag = ti.task.dag
-        if dag.catchup:
-            if dag.previous_schedule(ti.execution_date) is None:
-                yield self._passing_status(reason="This task does not have a schedule or is @once")
-                return
-            if dag.previous_schedule(ti.execution_date) < ti.task.start_date:

Review comment:
       This conditional was checking something like this case:
   
   ```python
   with DAG(start_date='2020-01-01'):
       task1 = Operator()
       task2 = Operator(start_date='2021-07-01', depends_on_past=True)
   ```
   
   I guess this isn't covered by the tests either :(




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] malthe commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
malthe commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r635988817



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table

Review comment:
       @uranusjr you can probably annotate `cached_property` with a generic type variable to work around this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r636070240



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,54 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:

Review comment:
       Sounds good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r651661880



##########
File path: airflow/timetables/interval.py
##########
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Optional
+
+from pendulum import DateTime
+from pendulum.tz.timezone import Timezone
+
+from airflow.timetables.base import DagRunInfo, TimeRestriction, TimeTable
+from airflow.timetables.schedules import CronSchedule, Delta, DeltaSchedule, Schedule
+
+
+class _DataIntervalTimeTable(TimeTable):
+    """Basis for time table implementations that schedule data intervals.
+
+    This kind of time tables create periodic data intervals from an underlying

Review comment:
       `timetable` to align with other usages?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r653891667



##########
File path: airflow/exceptions.py
##########
@@ -115,6 +115,17 @@ class AirflowClusterPolicyViolation(AirflowException):
     """Raise when there is a violation of a Cluster Policy in Dag definition"""
 
 
+class AirflowTimetableInvalid(AirflowException):

Review comment:
       ```suggestion
   class AirflowInvalidTimetable(AirflowException):
   ```
   
   `AirflowTimetableInvalid` feels odd, no strong opinion though

##########
File path: airflow/models/dag.py
##########
@@ -450,219 +454,167 @@ def date_range(
         num: Optional[int] = None,
         end_date: Optional[datetime] = timezone.utcnow(),
     ) -> List[datetime]:
+        message = "`DAG.date_range()` is deprecated."
         if num is not None:
-            end_date = None
-        return utils_date_range(
-            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval
-        )
+            result = utils_date_range(start_date=start_date, num=num)
+        else:
+            message += " Please use `DAG.get_run_dates(..., align=False)` instead."
+            result = self.get_run_dates(start_date, end_date, align=False)
+        warnings.warn(message, category=DeprecationWarning, stacklevel=2)
+        return result
 
     def is_fixed_time_schedule(self):
-        """
-        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
-
-        :return: True if the schedule has a fixed time, False if not.
-        """
-        now = datetime.now()
-        cron = croniter(self.normalized_schedule_interval, now)
-
-        start = cron.get_next(datetime)
-        cron_next = cron.get_next(datetime)
-
-        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+        warnings.warn(
+            "`DAG.is_fixed_time_schedule()` is deprecated.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        try:
+            return not self.timetable._schedule._should_fix_dst
+        except AttributeError:
             return True
 
-        return False
-
     def following_schedule(self, dttm):
         """
         Calculates the following schedule for this dag in UTC.
 
         :param dttm: utc datetime
         :return: utc datetime
         """
-        if isinstance(self.normalized_schedule_interval, str):
-            # we don't want to rely on the transitions created by
-            # croniter as they are not always correct
-            dttm = pendulum.instance(dttm)
-            naive = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self.normalized_schedule_interval, naive)
-
-            # We assume that DST transitions happen on the minute/hour
-            if not self.is_fixed_time_schedule():
-                # relative offset (eg. every 5 minutes)
-                delta = cron.get_next(datetime) - naive
-                following = dttm.in_timezone(self.timezone) + delta
-            else:
-                # absolute (e.g. 3 AM)
-                naive = cron.get_next(datetime)
-                tz = pendulum.timezone(self.timezone.name)
-                following = timezone.make_aware(naive, tz)
-            return timezone.convert_to_utc(following)
-        elif self.normalized_schedule_interval is not None:
-            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval)
+        current = pendulum.instance(dttm)
+        between = TimeRestriction(None, None, catchup=True)

Review comment:
       ```suggestion
           between = TimeRestriction(earliest=None, latest=None, catchup=True)
   ```
   
   easier to understand :) -- again not strictly needed

##########
File path: airflow/models/dag.py
##########
@@ -450,219 +454,167 @@ def date_range(
         num: Optional[int] = None,
         end_date: Optional[datetime] = timezone.utcnow(),
     ) -> List[datetime]:
+        message = "`DAG.date_range()` is deprecated."
         if num is not None:
-            end_date = None
-        return utils_date_range(
-            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval
-        )
+            result = utils_date_range(start_date=start_date, num=num)
+        else:
+            message += " Please use `DAG.get_run_dates(..., align=False)` instead."
+            result = self.get_run_dates(start_date, end_date, align=False)
+        warnings.warn(message, category=DeprecationWarning, stacklevel=2)
+        return result
 
     def is_fixed_time_schedule(self):
-        """
-        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
-
-        :return: True if the schedule has a fixed time, False if not.
-        """
-        now = datetime.now()
-        cron = croniter(self.normalized_schedule_interval, now)
-
-        start = cron.get_next(datetime)
-        cron_next = cron.get_next(datetime)
-
-        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+        warnings.warn(
+            "`DAG.is_fixed_time_schedule()` is deprecated.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        try:
+            return not self.timetable._schedule._should_fix_dst
+        except AttributeError:
             return True
 
-        return False
-
     def following_schedule(self, dttm):
         """
         Calculates the following schedule for this dag in UTC.
 
         :param dttm: utc datetime
         :return: utc datetime
         """
-        if isinstance(self.normalized_schedule_interval, str):
-            # we don't want to rely on the transitions created by
-            # croniter as they are not always correct
-            dttm = pendulum.instance(dttm)
-            naive = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self.normalized_schedule_interval, naive)
-
-            # We assume that DST transitions happen on the minute/hour
-            if not self.is_fixed_time_schedule():
-                # relative offset (eg. every 5 minutes)
-                delta = cron.get_next(datetime) - naive
-                following = dttm.in_timezone(self.timezone) + delta
-            else:
-                # absolute (e.g. 3 AM)
-                naive = cron.get_next(datetime)
-                tz = pendulum.timezone(self.timezone.name)
-                following = timezone.make_aware(naive, tz)
-            return timezone.convert_to_utc(following)
-        elif self.normalized_schedule_interval is not None:
-            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval)
+        current = pendulum.instance(dttm)
+        between = TimeRestriction(None, None, catchup=True)
+        next_info = self.timetable.next_dagrun_info(current, between)
+        if next_info is None:
+            return None
+        return next_info.data_interval.start
 
     def previous_schedule(self, dttm):
-        """
-        Calculates the previous schedule for this dag in UTC
-
-        :param dttm: utc datetime
-        :return: utc datetime
-        """
-        if isinstance(self.normalized_schedule_interval, str):
-            # we don't want to rely on the transitions created by
-            # croniter as they are not always correct
-            dttm = pendulum.instance(dttm)
-            naive = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self.normalized_schedule_interval, naive)
-
-            # We assume that DST transitions happen on the minute/hour
-            if not self.is_fixed_time_schedule():
-                # relative offset (eg. every 5 minutes)
-                delta = naive - cron.get_prev(datetime)
-                previous = dttm.in_timezone(self.timezone) - delta
-            else:
-                # absolute (e.g. 3 AM)
-                naive = cron.get_prev(datetime)
-                tz = pendulum.timezone(self.timezone.name)
-                previous = timezone.make_aware(naive, tz)
-            return timezone.convert_to_utc(previous)
-        elif self.normalized_schedule_interval is not None:
-            return timezone.convert_to_utc(dttm - self.normalized_schedule_interval)
+        warnings.warn(
+            "`DAG.previous_schedule()` is deprecated.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        try:
+            schedule: Schedule = self.timetable._schedule
+        except AttributeError:
+            return None
+        return schedule.get_prev(pendulum.instance(dttm))
 
     def next_dagrun_info(
         self,
         date_last_automated_dagrun: Optional[pendulum.DateTime],
     ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
-        """
-        Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the
-        execution date, and the earliest it could be scheduled
-
-        :param date_last_automated_dagrun: The max(execution_date) of existing
-            "automated" DagRuns for this dag (scheduled or backfill, but not
-            manual)
-        """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        """Get information about the next DagRun of this dag after ``date_last_automated_dagrun``.
+
+        This calculates what time interval the next DagRun should operate on
+        (its execution date), and when it can be scheduled, , according to the

Review comment:
       ```suggestion
           (its execution date), and when it can be scheduled, according to the
   ```

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       ```suggestion
           # FIXME: The timezone.coerce_datetime calls in this function should not
   ```
   
   We use both `TODO` and `FIXME` -- either is fine for easier `grep` :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r619605456



##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not
+        # be necessary since the function annotation suggests it only accepts
+        # pendulum.DateTime, and someone is passing datetime.datetime into this
+        # function. We should fix whatever is doing that.
+        if self.is_subdag:
             return (None, None)
-        next_execution_date = self.next_dagrun_after_date(date_last_automated_dagrun)
-
-        if next_execution_date is None:
+        time_table: TimeTable = self.time_table
+        restriction = self._format_time_restriction()
+        if not self.catchup:
+            restriction = time_table.cancel_catchup(restriction)
+        next_info = time_table.next_dagrun_info(
+            timezone.coerce_datetime(date_last_automated_dagrun),
+            restriction,
+        )
+        if next_info is None:
             return (None, None)
-
-        if self.schedule_interval == "@once":
-            # For "@once" it can be created "now"
-            return (next_execution_date, next_execution_date)
-
-        return (next_execution_date, self.following_schedule(next_execution_date))
-
-    def next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime]):
-        """
-        Get the next execution date after the given ``date_last_automated_dagrun``, according to
-        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
-        "concurrency" type limits, it only performs calculations based on the various date and interval fields
-        of this dag and it's tasks.
-
-        :param date_last_automated_dagrun: The execution_date of the last scheduler or
-            backfill triggered run for this dag
-        :type date_last_automated_dagrun: pendulum.Pendulum
-        """
-        if not self.schedule_interval or self.is_subdag:
-            return None
-
-        # don't schedule @once again
-        if self.schedule_interval == '@once' and date_last_automated_dagrun:
-            return None
-
-        # don't do scheduler catchup for dag's that don't have dag.catchup = True
-        if not (self.catchup or self.schedule_interval == '@once'):
-            # The logic is that we move start_date up until
-            # one period before, so that timezone.utcnow() is AFTER
-            # the period end, and the job can be created...
-            now = timezone.utcnow()
-            next_start = self.following_schedule(now)
-            last_start = self.previous_schedule(now)
-            if next_start <= now or isinstance(self.schedule_interval, timedelta):
-                new_start = last_start
-            else:
-                new_start = self.previous_schedule(last_start)
-
-            if self.start_date:
-                if new_start >= self.start_date:
-                    self.start_date = new_start
-            else:
-                self.start_date = new_start
-
-        next_run_date = None
-        if not date_last_automated_dagrun:
-            # First run
-            task_start_dates = [t.start_date for t in self.tasks if t.start_date]
-            if task_start_dates:
-                next_run_date = self.normalize_schedule(min(task_start_dates))
-                self.log.debug("Next run date based on tasks %s", next_run_date)
+        return (next_info.data_interval.start, next_info.run_after)
+
+    def _format_time_restriction(self) -> TimeRestriction:
+        start_dates = [t.start_date for t in self.tasks if t.start_date]
+        if self.start_date is not None:
+            start_dates.append(self.start_date)
+        if start_dates:
+            restriction_earliest = timezone.coerce_datetime(min(start_dates))
         else:
-            next_run_date = self.following_schedule(date_last_automated_dagrun)
-
-        if date_last_automated_dagrun and next_run_date:
-            while next_run_date <= date_last_automated_dagrun:
-                next_run_date = self.following_schedule(next_run_date)
-
-        # don't ever schedule prior to the dag's start_date
-        if self.start_date:
-            next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
-            if next_run_date == self.start_date:
-                next_run_date = self.normalize_schedule(self.start_date)
-
-            self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)
-
-        # Don't schedule a dag beyond its end_date (as specified by the dag param)
-        if next_run_date and self.end_date and next_run_date > self.end_date:
-            return None
-
-        # Don't schedule a dag beyond its end_date (as specified by the task params)
-        # Get the min task end date, which may come from the dag.default_args
-        task_end_dates = [t.end_date for t in self.tasks if t.end_date]
-        if task_end_dates and next_run_date:
-            min_task_end_date = min(task_end_dates)
-            if next_run_date > min_task_end_date:
-                return None
-
-        return next_run_date
+            restriction_earliest = None
+        end_dates = [t.end_date for t in self.tasks if t.end_date]
+        if self.end_date is not None:
+            end_dates.append(self.end_date)
+        if end_dates:
+            restriction_latest = timezone.coerce_datetime(max(end_dates))
+        else:
+            restriction_latest = None
+        return TimeRestriction(restriction_earliest, restriction_latest)
+
+    @cached_property.cached_property
+    def time_table(self) -> TimeTable:
+        interval = self.schedule_interval
+        if interval is None:
+            return NullTimeTable()
+        if interval == "@once":
+            return OnceTimeTable()
+        if not isinstance(interval, str):
+            assert isinstance(interval, (timedelta, relativedelta))

Review comment:
       We agreed to not use assert in production code. See:
   https://lists.apache.org/thread.html/08b64d3b084c865399f98f6c6f56235ce5329e843d97938e1a8045a5%40%3Cdev.airflow.apache.org%3E




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org