You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/27 21:57:54 UTC

[GitHub] [airflow] dimberman commented on a change in pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

dimberman commented on a change in pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#discussion_r565662528



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:

Review comment:
       this function name feels a bit vague to me. I would ad docs to the abstract class or consider making it mroe descriptive.

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:

Review comment:
       Will this be a necessary step for anyone looking to backport? Is there a way we can make this easier?

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:
+            end_date = None
+        return utils_date_range(
+            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval()
+        )
+
+    def following_schedule(self, dttm):
+        """
+        Calculates the following schedule for this dag in UTC.
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self.normalized_schedule_interval(), naive)
+
+            # We assume that DST transitions happen on the minute/hour

Review comment:
       Is that a reasonable assumption to make?

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:
+            end_date = None
+        return utils_date_range(
+            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval()
+        )
+
+    def following_schedule(self, dttm):
+        """
+        Calculates the following schedule for this dag in UTC.
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)
+            naive = timezone.make_naive(dttm, self.timezone)
+            cron = croniter(self.normalized_schedule_interval(), naive)
+
+            # We assume that DST transitions happen on the minute/hour
+            if not self.is_fixed_time_schedule():
+                # relative offset (eg. every 5 minutes)
+                delta = cron.get_next(datetime) - naive
+                following = dttm.in_timezone(self.timezone) + delta
+            else:
+                # absolute (e.g. 3 AM)
+                naive = cron.get_next(datetime)
+                tz = pendulum.timezone(self.timezone.name)
+                following = timezone.make_aware(naive, tz)
+            return timezone.convert_to_utc(following)
+        elif self.normalized_schedule_interval() is not None:
+            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval())
+
+    def previous_schedule(self, dttm):
+        """
+        Calculates the previous schedule for this dag in UTC
+
+        :param dttm: utc datetime
+        :return: utc datetime
+        """
+        if isinstance(self.normalized_schedule_interval(), str):
+            # we don't want to rely on the transitions created by
+            # croniter as they are not always correct
+            dttm = pendulum.instance(dttm)

Review comment:
       these names are not descriptive

##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        """
+        Returns Normalized Schedule Interval. This is used internally by the Scheduler to
+        schedule DAGs.
+
+        1. Converts Cron Preset to a Cron Expression (e.g ``@monthly`` to ``0 0 1 * *``)
+        2. If Schedule Interval is "@once" return "None"
+        3. If not (1) or (2) returns schedule_interval
+        """
+        if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
+            _schedule_interval = cron_presets.get(self.schedule_interval)  # type: Optional[ScheduleInterval]
+        elif self.schedule_interval == '@once':
+            _schedule_interval = None
+        else:
+            _schedule_interval = self.schedule_interval
+        return _schedule_interval
+
+    def is_fixed_time_schedule(self):
+        """
+        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
+
+        :return: True if the schedule has a fixed time, False if not.
+        """
+        now = datetime.now()
+        cron = croniter(self.normalized_schedule_interval(), now)
+
+        start = cron.get_next(datetime)
+        cron_next = cron.get_next(datetime)
+
+        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+            return True
+
+        return False
+
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        if num is not None:

Review comment:
       num is not a descriptive variable name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org