You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/27 22:24:00 UTC

[GitHub] [airflow] jhtimmins commented on a change in pull request #13930: WIP: POC of Timetable feature. Includes AbstractTimetable and CronTimetable classes.

jhtimmins commented on a change in pull request #13930:
URL: https://github.com/apache/airflow/pull/13930#discussion_r565679418



##########
File path: airflow/models/dag.py
##########
@@ -109,6 +110,312 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+class AbstractTimetable(LoggingMixin):
+    """Abstract class for Timetable."""
+
+    @abstractmethod
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_fixed_time_schedule(self):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def date_range(
+        self,
+        start_date: datetime,
+        num: Optional[int] = None,
+        end_date: Optional[datetime] = timezone.utcnow(),
+    ) -> List[datetime]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def following_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def previous_schedule(self, dttm):
+        raise NotImplementedError()
+
+    @abstractmethod
+    def next_dagrun_info(
+        self,
+        date_last_automated_dagrun: Optional[pendulum.DateTime],
+        tasks,
+    ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_run_dates(self, start_date, end_date=None):
+        raise NotImplementedError()
+
+
+class CronTimetable(AbstractTimetable):
+    """Timetable implementation for Cron."""
+
+    def __init__(
+        self,
+        schedule_interval=None,
+        timezone=None,
+        catchup=None,
+        start_date=None,
+        end_date=None,
+    ):
+        self.schedule_interval = schedule_interval
+        self.timezone = timezone
+        self.catchup = catchup
+        self.start_date = start_date
+        self.end_date = end_date
+
+    def normalized_schedule_interval(self) -> Optional[ScheduleInterval]:

Review comment:
       @dimberman  This code is almost entirely copy+pasted from existing code. So I'll make things readable before sending it around.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org