You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:06:30 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #15397: AIP-39: Use TimeTable interface to implement scheduling inside the DAG class

kaxil commented on a change in pull request #15397:
URL: https://github.com/apache/airflow/pull/15397#discussion_r653891667



##########
File path: airflow/exceptions.py
##########
@@ -115,6 +115,17 @@ class AirflowClusterPolicyViolation(AirflowException):
     """Raise when there is a violation of a Cluster Policy in Dag definition"""
 
 
+class AirflowTimetableInvalid(AirflowException):

Review comment:
       ```suggestion
   class AirflowInvalidTimetable(AirflowException):
   ```
   
   `AirflowTimetableInvalid` feels odd, no strong opinion though

##########
File path: airflow/models/dag.py
##########
@@ -450,219 +454,167 @@ def date_range(
         num: Optional[int] = None,
         end_date: Optional[datetime] = timezone.utcnow(),
     ) -> List[datetime]:
+        message = "`DAG.date_range()` is deprecated."
         if num is not None:
-            end_date = None
-        return utils_date_range(
-            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval
-        )
+            result = utils_date_range(start_date=start_date, num=num)
+        else:
+            message += " Please use `DAG.get_run_dates(..., align=False)` instead."
+            result = self.get_run_dates(start_date, end_date, align=False)
+        warnings.warn(message, category=DeprecationWarning, stacklevel=2)
+        return result
 
     def is_fixed_time_schedule(self):
-        """
-        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
-
-        :return: True if the schedule has a fixed time, False if not.
-        """
-        now = datetime.now()
-        cron = croniter(self.normalized_schedule_interval, now)
-
-        start = cron.get_next(datetime)
-        cron_next = cron.get_next(datetime)
-
-        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+        warnings.warn(
+            "`DAG.is_fixed_time_schedule()` is deprecated.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        try:
+            return not self.timetable._schedule._should_fix_dst
+        except AttributeError:
             return True
 
-        return False
-
     def following_schedule(self, dttm):
         """
         Calculates the following schedule for this dag in UTC.
 
         :param dttm: utc datetime
         :return: utc datetime
         """
-        if isinstance(self.normalized_schedule_interval, str):
-            # we don't want to rely on the transitions created by
-            # croniter as they are not always correct
-            dttm = pendulum.instance(dttm)
-            naive = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self.normalized_schedule_interval, naive)
-
-            # We assume that DST transitions happen on the minute/hour
-            if not self.is_fixed_time_schedule():
-                # relative offset (eg. every 5 minutes)
-                delta = cron.get_next(datetime) - naive
-                following = dttm.in_timezone(self.timezone) + delta
-            else:
-                # absolute (e.g. 3 AM)
-                naive = cron.get_next(datetime)
-                tz = pendulum.timezone(self.timezone.name)
-                following = timezone.make_aware(naive, tz)
-            return timezone.convert_to_utc(following)
-        elif self.normalized_schedule_interval is not None:
-            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval)
+        current = pendulum.instance(dttm)
+        between = TimeRestriction(None, None, catchup=True)

Review comment:
       ```suggestion
           between = TimeRestriction(earliest=None, latest=None, catchup=True)
   ```
   
   easier to understand :) -- again not strictly needed

##########
File path: airflow/models/dag.py
##########
@@ -450,219 +454,167 @@ def date_range(
         num: Optional[int] = None,
         end_date: Optional[datetime] = timezone.utcnow(),
     ) -> List[datetime]:
+        message = "`DAG.date_range()` is deprecated."
         if num is not None:
-            end_date = None
-        return utils_date_range(
-            start_date=start_date, end_date=end_date, num=num, delta=self.normalized_schedule_interval
-        )
+            result = utils_date_range(start_date=start_date, num=num)
+        else:
+            message += " Please use `DAG.get_run_dates(..., align=False)` instead."
+            result = self.get_run_dates(start_date, end_date, align=False)
+        warnings.warn(message, category=DeprecationWarning, stacklevel=2)
+        return result
 
     def is_fixed_time_schedule(self):
-        """
-        Figures out if the DAG schedule has a fixed time (e.g. 3 AM).
-
-        :return: True if the schedule has a fixed time, False if not.
-        """
-        now = datetime.now()
-        cron = croniter(self.normalized_schedule_interval, now)
-
-        start = cron.get_next(datetime)
-        cron_next = cron.get_next(datetime)
-
-        if cron_next.minute == start.minute and cron_next.hour == start.hour:
+        warnings.warn(
+            "`DAG.is_fixed_time_schedule()` is deprecated.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        try:
+            return not self.timetable._schedule._should_fix_dst
+        except AttributeError:
             return True
 
-        return False
-
     def following_schedule(self, dttm):
         """
         Calculates the following schedule for this dag in UTC.
 
         :param dttm: utc datetime
         :return: utc datetime
         """
-        if isinstance(self.normalized_schedule_interval, str):
-            # we don't want to rely on the transitions created by
-            # croniter as they are not always correct
-            dttm = pendulum.instance(dttm)
-            naive = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self.normalized_schedule_interval, naive)
-
-            # We assume that DST transitions happen on the minute/hour
-            if not self.is_fixed_time_schedule():
-                # relative offset (eg. every 5 minutes)
-                delta = cron.get_next(datetime) - naive
-                following = dttm.in_timezone(self.timezone) + delta
-            else:
-                # absolute (e.g. 3 AM)
-                naive = cron.get_next(datetime)
-                tz = pendulum.timezone(self.timezone.name)
-                following = timezone.make_aware(naive, tz)
-            return timezone.convert_to_utc(following)
-        elif self.normalized_schedule_interval is not None:
-            return timezone.convert_to_utc(dttm + self.normalized_schedule_interval)
+        current = pendulum.instance(dttm)
+        between = TimeRestriction(None, None, catchup=True)
+        next_info = self.timetable.next_dagrun_info(current, between)
+        if next_info is None:
+            return None
+        return next_info.data_interval.start
 
     def previous_schedule(self, dttm):
-        """
-        Calculates the previous schedule for this dag in UTC
-
-        :param dttm: utc datetime
-        :return: utc datetime
-        """
-        if isinstance(self.normalized_schedule_interval, str):
-            # we don't want to rely on the transitions created by
-            # croniter as they are not always correct
-            dttm = pendulum.instance(dttm)
-            naive = timezone.make_naive(dttm, self.timezone)
-            cron = croniter(self.normalized_schedule_interval, naive)
-
-            # We assume that DST transitions happen on the minute/hour
-            if not self.is_fixed_time_schedule():
-                # relative offset (eg. every 5 minutes)
-                delta = naive - cron.get_prev(datetime)
-                previous = dttm.in_timezone(self.timezone) - delta
-            else:
-                # absolute (e.g. 3 AM)
-                naive = cron.get_prev(datetime)
-                tz = pendulum.timezone(self.timezone.name)
-                previous = timezone.make_aware(naive, tz)
-            return timezone.convert_to_utc(previous)
-        elif self.normalized_schedule_interval is not None:
-            return timezone.convert_to_utc(dttm - self.normalized_schedule_interval)
+        warnings.warn(
+            "`DAG.previous_schedule()` is deprecated.",
+            category=DeprecationWarning,
+            stacklevel=2,
+        )
+        try:
+            schedule: Schedule = self.timetable._schedule
+        except AttributeError:
+            return None
+        return schedule.get_prev(pendulum.instance(dttm))
 
     def next_dagrun_info(
         self,
         date_last_automated_dagrun: Optional[pendulum.DateTime],
     ) -> Tuple[Optional[pendulum.DateTime], Optional[pendulum.DateTime]]:
-        """
-        Get information about the next DagRun of this dag after ``date_last_automated_dagrun`` -- the
-        execution date, and the earliest it could be scheduled
-
-        :param date_last_automated_dagrun: The max(execution_date) of existing
-            "automated" DagRuns for this dag (scheduled or backfill, but not
-            manual)
-        """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        """Get information about the next DagRun of this dag after ``date_last_automated_dagrun``.
+
+        This calculates what time interval the next DagRun should operate on
+        (its execution date), and when it can be scheduled, , according to the

Review comment:
       ```suggestion
           (its execution date), and when it can be scheduled, according to the
   ```

##########
File path: airflow/models/dag.py
##########
@@ -533,94 +537,53 @@ def next_dagrun_info(
             "automated" DagRuns for this dag (scheduled or backfill, but not
             manual)
         """
-        if (
-            self.schedule_interval == "@once" and date_last_automated_dagrun
-        ) or self.schedule_interval is None:
-            # Manual trigger, or already created the run for @once, can short circuit
+        # XXX: The timezone.coerce_datetime calls in this function should not

Review comment:
       ```suggestion
           # FIXME: The timezone.coerce_datetime calls in this function should not
   ```
   
   We use both `TODO` and `FIXME` -- either is fine for easier `grep` :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org