You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/27 19:37:41 UTC

[GitHub] [airflow] jhtimmins opened a new pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

jhtimmins opened a new pull request #13930:
URL: https://github.com/apache/airflow/pull/13930


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#discussion_r567835695



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:
+            end_date = None
+        return utils_date_range(
+            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval()
+        )
+
+    def following_schedule(self, dttm):
+        """
+        Calculates the following schedule for this dag in UTC.
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self.normalized_schedule_interval(), naive)
+
+            # We assume that DST transitions happen on the minute/hour

Review comment:
       Not a new assumption -- this is just moving the existing code around.
   
   So far, yes, this is a reasonable assumption though, yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#discussion_r567838256



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,308 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:
+            end_date = None
+        return utils_date_range(
+            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval()
+        )
+
+    def following_schedule(self, dttm):
+        """
+        Calculates the following schedule for this dag in UTC.
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self.normalized_schedule_interval(), naive)
+
+            # We assume that DST transitions happen on the minute/hour
+            if not self.is_fixed_time_schedule():
+                # relative offset (eg. every 5 minutes)
+                delta = cron.get_next(datetime) - naive
+                following = dttm.in_timezone(self.timezone) + delta
+            else:
+                # absolute (e.g. 3 AM)
+                naive = cron.get_next(datetime)
+                tz = pendulum.timezone(self.timezone.name)
+                following = timezone.make_aware(naive, tz)
+            return timezone.convert_to_utc(following)
+        elif self.normalized_schedule_interval() is not None:
+            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval())
+
+    def previous_schedule(self, dttm):
+        """
+        Calculates the previous schedule for this dag in UTC
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self.normalized_schedule_interval(), naive)
+
+            # We assume that DST transitions happen on the minute/hour
+            if not self.is_fixed_time_schedule():
+                # relative offset (eg. every 5 minutes)
+                delta = naive - cron.get_prev(datetime)
+                previous = dttm.in_timezone(self.timezone) - delta
+            else:
+                # absolute (e.g. 3 AM)
+                naive = cron.get_prev(datetime)
+                tz = pendulum.timezone(self.timezone.name)
+                previous = timezone.make_aware(naive, tz)
+            return timezone.convert_to_utc(previous)
+        elif self.normalized_schedule_interval() is not None:
+            return timezone.convert_to_utc(dttm - self.normalized_schedule_interval())
+
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        """
+        Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the
+        execution date, and the earliest it could be scheduled
+
+        :param date_last_automated_dagrun: The max(execution_date) of existing
+            "automated" DagRuns for this dag (scheduled or backfill, but not
+            manual)
+        """
+        if (
+            self.schedule_interval == "@once" and date_last_automated_dagrun
+        ) or self.schedule_interval is None:
+            # Manual trigger, or already created the run for @once, can short circuit
+            return (None, None)
+        next_execution_date = self._next_dagrun_after_date(date_last_automated_dagrun, tasks)
+
+        if next_execution_date is None:
+            return (None, None)
+
+        if self.schedule_interval == "@once":
+            # For "@once" it can be created "now"
+            return (next_execution_date, next_execution_date)
+
+        return (next_execution_date, self.following_schedule(next_execution_date))
+
+    def _next_dagrun_after_date(self, date_last_automated_dagrun: Optional[pendulum.DateTime], tasks):
+        """
+        Get the next execution date after the given ``date_last_automated_dagrun``, according to
+        schedule_interval, start_date, end_date etc.  This doesn't check max active run or any other
+        "concurrency" type limits, it only performs calculations based on the various date and interval fields
+        of this dag and it's tasks.
+
+        :param date_last_automated_dagrun: The execution_date of the last scheduler or
+            backfill triggered run for this dag
+        :type date_last_automated_dagrun: pendulum.Pendulum
+        """
+        if not self.schedule_interval:
+            return None
+
+        # don't schedule @once again
+        if self.schedule_interval == '@once' and date_last_automated_dagrun:
+            return None
+
+        # don't do scheduler catchup for dag's that don't have dag.catchup = True
+        if not (self.catchup or self.schedule_interval == '@once'):
+            # The logic is that we move start_date up until
+            # one period before, so that timezone.utcnow() is AFTER
+            # the period end, and the job can be created...
+            now = timezone.utcnow()
+            next_start = self.following_schedule(now)
+            last_start = self.previous_schedule(now)
+            if next_start <= now or isinstance(self.schedule_interval, timedelta):
+                new_start = last_start
+            else:
+                new_start = self.previous_schedule(last_start)
+
+            if self.start_date:
+                if new_start >= self.start_date:
+                    self.start_date = new_start
+            else:
+                self.start_date = new_start
+
+        next_run_date = None
+        if not date_last_automated_dagrun:
+            # First run
+            task_start_dates = [t.start_date for t in tasks if t.start_date]
+            if task_start_dates:
+                next_run_date = self._normalize_schedule(min(task_start_dates))
+                self.log.debug("Next run date based on tasks %s", next_run_date)
+        else:
+            next_run_date = self.following_schedule(date_last_automated_dagrun)
+
+        if date_last_automated_dagrun and next_run_date:
+            while next_run_date <= date_last_automated_dagrun:
+                next_run_date = self.following_schedule(next_run_date)
+
+        # don't ever schedule prior to the dag's start_date
+        if self.start_date:
+            next_run_date = self.start_date if not next_run_date else max(next_run_date, self.start_date)
+            if next_run_date == self.start_date:
+                next_run_date = self._normalize_schedule(self.start_date)
+
+            self.log.debug("Dag start date: %s. Next run date: %s", self.start_date, next_run_date)
+
+        # Don't schedule a dag beyond its end_date (as specified by the dag param)
+        if next_run_date and self.end_date and next_run_date > self.end_date:
+            return None
+
+        # Don't schedule a dag beyond its end_date (as specified by the task params)
+        # Get the min task end date, which may come from the dag.default_args
+        task_end_dates = [t.end_date for t in tasks if t.end_date]
+        if task_end_dates and next_run_date:
+            min_task_end_date = min(task_end_dates)
+            if next_run_date > min_task_end_date:
+                return None

Review comment:
       Some of this feels like it should be in the base timetable class -- or at least, each timetable subclass shouldn't be responsible for implementing this check etc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#issuecomment-777670704


   [The Workflow run](https://github.com/apache/airflow/actions/runs/558607751) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#issuecomment-768601869


   [The Workflow run](https://github.com/apache/airflow/actions/runs/516247430) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#issuecomment-808986726


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#discussion_r567834660



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:

Review comment:
       Backport what?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#issuecomment-777671184


   [The Workflow run](https://github.com/apache/airflow/actions/runs/558527178) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on a change in pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
dimberman commented on a change in pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#discussion_r565662528



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:

Review comment:
       this function name feels a bit vague to me. I would ad docs to the abstract class or consider making it mroe descriptive.

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:

Review comment:
       Will this be a necessary step for anyone looking to backport? Is there a way we can make this easier?

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:
+            end_date = None
+        return utils_date_range(
+            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval()
+        )
+
+    def following_schedule(self, dttm):
+        """
+        Calculates the following schedule for this dag in UTC.
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self.normalized_schedule_interval(), naive)
+
+            # We assume that DST transitions happen on the minute/hour

Review comment:
       Is that a reasonable assumption to make?

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:
+            end_date = None
+        return utils_date_range(
+            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval()
+        )
+
+    def following_schedule(self, dttm):
+        """
+        Calculates the following schedule for this dag in UTC.
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self.normalized_schedule_interval(), naive)
+
+            # We assume that DST transitions happen on the minute/hour
+            if not self.is_fixed_time_schedule():
+                # relative offset (eg. every 5 minutes)
+                delta = cron.get_next(datetime) - naive
+                following = dttm.in_timezone(self.timezone) + delta
+            else:
+                # absolute (e.g. 3 AM)
+                naive = cron.get_next(datetime)
+                tz = pendulum.timezone(self.timezone.name)
+                following = timezone.make_aware(naive, tz)
+            return timezone.convert_to_utc(following)
+        elif self.normalized_schedule_interval() is not None:
+            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval())
+
+    def previous_schedule(self, dttm):
+        """
+        Calculates the previous schedule for this dag in UTC
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)

Review comment:
       these names are not descriptive

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:

Review comment:
       num is not a descriptive variable name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jhtimmins commented on a change in pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
jhtimmins commented on a change in pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#discussion_r565679418



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:

Review comment:
       @dimberman  This code is almost entirely copy+pasted from existing code. So I'll make things readable before sending it around.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13930:
URL: https://github.com/apache/airflow/pull/13930


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org