You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/04 13:02:12 UTC

[GitHub] [airflow] uranusjr opened a new pull request #17414: @uranusjr Allow custom timetable as a DAG argument

uranusjr opened a new pull request #17414:
URL: https://github.com/apache/airflow/pull/17414


   To allow a custom `timetable` argument to work on a DAG, the existing `schedule_interval` attribute and the `normalized_schedule_interval` property are removed. All existing aceess to `schedule_interval` (the normalized
   property is not used anywhere) are all changed to use the DAG's `timetable` member. A couple of timetable-describing flags, `can_run` and `periodic`, are added to replace some usages that need to know about what the timetable
   is capable of doing.
   
   Serialization methods are added to timetable classes so the timetable can be saved to the meta database during DAG serialization.
   
   Since the `schedule_interval` attribute has been removed, the field on `DagModel` of the same name is also changed. Since we still need that information to display the timetable on the web UI, a field `timetable_summary` is added as its replacement. For convinience, this is actually implemented as a database field rename (so existing rows in the database may contain serialized `timedelta` and `relativetimedelta` values instead of a plain string), but all new `DagModel` rows from now on will only store string values in it.
   
   While we're at it, an extra field `timetable_description` is also added to implement cron expression explaination. This is not strictly related to the `schedule_interval` change, but I feel it is worthwhile to combine the changes into one migration instead of two.
   
   All web UI usages to `schedule_interval` now use `timetable_summary` instead. The UI part of `timetable_description` is *not* included since mixing those does not have the same benefit as merging db migrations.
   
   This will conflict with #16352 (both contains a database migration), so we need to get that merged in first. But this one is more or less ready on its own, so I'm posting it out for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-893693795


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #17414:
URL: https://github.com/apache/airflow/pull/17414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698670417



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -87,6 +88,64 @@ def get_operator_extra_links():
     return _OPERATOR_EXTRA_LINKS
 
 
+def encode_relativedelta(var: relativedelta.relativedelta) -> Dict[str, Any]:
+    encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") and v}
+    if var.weekday and var.weekday.n:
+        # Every n'th Friday for example
+        encoded['weekday'] = [var.weekday.weekday, var.weekday.n]
+    elif var.weekday:
+        encoded['weekday'] = [var.weekday.weekday]
+    return encoded
+
+
+def decode_relativedelta(var: Dict[str, Any]) -> relativedelta.relativedelta:
+    if 'weekday' in var:
+        var['weekday'] = relativedelta.weekday(*var['weekday'])  # type: ignore
+    return relativedelta.relativedelta(**var)
+
+
+def encode_timezone(var: Timezone) -> Union[str, int]:
+    """Encode a Pendulum Timezone for serialization.
+
+    Airflow only supports timezone objects that implements Pendulum's Timezone
+    interface. We try to keep as much information as possible to make conversion
+    round-tripping possible (see ``decode_timezone``). We need to special-case
+    UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+    0 without the special case), but passing 0 into ``pendulum.timezone`` does
+    not give us UTC (but ``+00:00``).
+    """
+    if isinstance(var, FixedTimezone):
+        if var.offset == 0:
+            return "UTC"
+        return var.offset
+    if isinstance(var, Timezone):
+        return var.name
+    raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}")
+
+
+def decode_timezone(var: Union[str, int]) -> Timezone:
+    """Decode a previously serialized Pendulum Timezone."""
+    return pendulum.timezone(var)
+
+
+def encode_timetable(var: Timetable) -> Dict[str, Any]:
+    """Encode a timetable instance.
+
+    This delegates most of the serialization work to the type, so the behavior
+    can be completely controlled by a custom subclass.
+    """
+    return {"type": as_importable_string(type(var)), "value": var.serialize()}

Review comment:
       I’m assuming we’re not yet set on the serialisation method because of the class registration stuff (and I’m not sure the current method will always work if the timetable class is declared in a DAG file), so let’s not think too hard on this for now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r695821210



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})

Review comment:
       Ahh cool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r695662217



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})
+
 DagStateChangeCallback = Callable[[Context], None]
+ScheduleInterval = Union[str, timedelta, relativedelta]
+ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]
+
+
+# Backward compatibility: If neither schedule_interval nor timetable is
+# *provided by the user*, default to a one-day interval.
+DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)

Review comment:
       It was `timedelta(days=1)` before this PR; I don’t know if it’s changed at some point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698310604



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -137,6 +137,11 @@
                 'label': 'custom_task',
             },
         ],
+        "schedule_interval": {"__type": "timedelta", "__var": 86400.0},
+        "timetable": {
+            "type": "airflow.timetables.interval.DeltaDataIntervalTimetable",
+            "value": {"delta": 86400.0},
+        },

Review comment:
       They are both included because I don’t know how to do conditional field inclusion 😆 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698308122



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})
+
 DagStateChangeCallback = Callable[[Context], None]
+ScheduleInterval = Union[str, timedelta, relativedelta]
+ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]
+
+
+# Backward compatibility: If neither schedule_interval nor timetable is
+# *provided by the user*, default to a one-day interval.
+DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)
+
+
+def create_timetable(interval: ScheduleIntervalArg, timezone: tzinfo) -> Timetable:
+    """Create a Timetable instance from a ``schedule_interval`` argument."""
+    if interval is ScheduleIntervalArgNotSet:
+        return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)

Review comment:
       `DEFAULT_SCHEDULE_INTERVAL` is also used to provide a default value for `DAG.schedule_interval`. This is needed for UI display.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #17414:
URL: https://github.com/apache/airflow/pull/17414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698308122



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})
+
 DagStateChangeCallback = Callable[[Context], None]
+ScheduleInterval = Union[str, timedelta, relativedelta]
+ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]
+
+
+# Backward compatibility: If neither schedule_interval nor timetable is
+# *provided by the user*, default to a one-day interval.
+DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)
+
+
+def create_timetable(interval: ScheduleIntervalArg, timezone: tzinfo) -> Timetable:
+    """Create a Timetable instance from a ``schedule_interval`` argument."""
+    if interval is ScheduleIntervalArgNotSet:
+        return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)

Review comment:
       `DEFAULT_SCHEDULE_INTERVAL` is also used to provide a default value for `DAG.schedule_interval` (see comment below for explaination)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r695823577



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})
+
 DagStateChangeCallback = Callable[[Context], None]
+ScheduleInterval = Union[str, timedelta, relativedelta]
+ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]
+
+
+# Backward compatibility: If neither schedule_interval nor timetable is
+# *provided by the user*, default to a one-day interval.
+DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)
+
+
+def create_timetable(interval: ScheduleIntervalArg, timezone: tzinfo) -> Timetable:
+    """Create a Timetable instance from a ``schedule_interval`` argument."""
+    if interval is ScheduleIntervalArgNotSet:
+        return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)

Review comment:
       ```suggestion
           return DEFAULT_TIMETABLE
   ```

##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})
+
 DagStateChangeCallback = Callable[[Context], None]
+ScheduleInterval = Union[str, timedelta, relativedelta]
+ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]
+
+
+# Backward compatibility: If neither schedule_interval nor timetable is
+# *provided by the user*, default to a one-day interval.
+DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)

Review comment:
       ```suggestion
   DEFAULT_TIMETABLE = DeltaDataIntervalTimetable(timedelta(days=1))
   ```
   
   perhaps?

##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -137,6 +137,11 @@
                 'label': 'custom_task',
             },
         ],
+        "schedule_interval": {"__type": "timedelta", "__var": 86400.0},
+        "timetable": {
+            "type": "airflow.timetables.interval.DeltaDataIntervalTimetable",
+            "value": {"delta": 86400.0},
+        },

Review comment:
       I don't think we should have both of these in a serialized row -- shouldn't it be one xor the other?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #17414:
URL: https://github.com/apache/airflow/pull/17414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-908366250


   Static check failing due to unrelated issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #17414:
URL: https://github.com/apache/airflow/pull/17414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr merged pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr merged pull request #17414:
URL: https://github.com/apache/airflow/pull/17414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r685803516



##########
File path: airflow/www/views.py
##########
@@ -2601,7 +2601,7 @@ def landing_times(self, session=None):
             x_points[task_id] = []
             for ti in tis:
                 ts = ti.execution_date
-                if dag.schedule_interval and dag.following_schedule(ts):
+                if dag.following_schedule(ts):

Review comment:
       `DAG.following_schedule()` returns None when `DAG.schedule_interval` is `None` anyway, so the `dag.schedule_interval` check is not needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698670417



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -87,6 +88,64 @@ def get_operator_extra_links():
     return _OPERATOR_EXTRA_LINKS
 
 
+def encode_relativedelta(var: relativedelta.relativedelta) -> Dict[str, Any]:
+    encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") and v}
+    if var.weekday and var.weekday.n:
+        # Every n'th Friday for example
+        encoded['weekday'] = [var.weekday.weekday, var.weekday.n]
+    elif var.weekday:
+        encoded['weekday'] = [var.weekday.weekday]
+    return encoded
+
+
+def decode_relativedelta(var: Dict[str, Any]) -> relativedelta.relativedelta:
+    if 'weekday' in var:
+        var['weekday'] = relativedelta.weekday(*var['weekday'])  # type: ignore
+    return relativedelta.relativedelta(**var)
+
+
+def encode_timezone(var: Timezone) -> Union[str, int]:
+    """Encode a Pendulum Timezone for serialization.
+
+    Airflow only supports timezone objects that implements Pendulum's Timezone
+    interface. We try to keep as much information as possible to make conversion
+    round-tripping possible (see ``decode_timezone``). We need to special-case
+    UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+    0 without the special case), but passing 0 into ``pendulum.timezone`` does
+    not give us UTC (but ``+00:00``).
+    """
+    if isinstance(var, FixedTimezone):
+        if var.offset == 0:
+            return "UTC"
+        return var.offset
+    if isinstance(var, Timezone):
+        return var.name
+    raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}")
+
+
+def decode_timezone(var: Union[str, int]) -> Timezone:
+    """Decode a previously serialized Pendulum Timezone."""
+    return pendulum.timezone(var)
+
+
+def encode_timetable(var: Timetable) -> Dict[str, Any]:
+    """Encode a timetable instance.
+
+    This delegates most of the serialization work to the type, so the behavior
+    can be completely controlled by a custom subclass.
+    """
+    return {"type": as_importable_string(type(var)), "value": var.serialize()}

Review comment:
       I’m assuming we’re not yet set on the serialisation method because of the class registration stuff (and I’m not sure the current method will always work if the timetable class is declared in a DAG file), so let’s not think too hard on this for now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-908366250






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698663925



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -87,6 +88,64 @@ def get_operator_extra_links():
     return _OPERATOR_EXTRA_LINKS
 
 
+def encode_relativedelta(var: relativedelta.relativedelta) -> Dict[str, Any]:
+    encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") and v}
+    if var.weekday and var.weekday.n:
+        # Every n'th Friday for example
+        encoded['weekday'] = [var.weekday.weekday, var.weekday.n]
+    elif var.weekday:
+        encoded['weekday'] = [var.weekday.weekday]
+    return encoded
+
+
+def decode_relativedelta(var: Dict[str, Any]) -> relativedelta.relativedelta:
+    if 'weekday' in var:
+        var['weekday'] = relativedelta.weekday(*var['weekday'])  # type: ignore
+    return relativedelta.relativedelta(**var)
+
+
+def encode_timezone(var: Timezone) -> Union[str, int]:
+    """Encode a Pendulum Timezone for serialization.
+
+    Airflow only supports timezone objects that implements Pendulum's Timezone
+    interface. We try to keep as much information as possible to make conversion
+    round-tripping possible (see ``decode_timezone``). We need to special-case
+    UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+    0 without the special case), but passing 0 into ``pendulum.timezone`` does
+    not give us UTC (but ``+00:00``).
+    """
+    if isinstance(var, FixedTimezone):
+        if var.offset == 0:
+            return "UTC"
+        return var.offset
+    if isinstance(var, Timezone):
+        return var.name
+    raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}")
+
+
+def decode_timezone(var: Union[str, int]) -> Timezone:
+    """Decode a previously serialized Pendulum Timezone."""
+    return pendulum.timezone(var)
+
+
+def encode_timetable(var: Timetable) -> Dict[str, Any]:
+    """Encode a timetable instance.
+
+    This delegates most of the serialization work to the type, so the behavior
+    can be completely controlled by a custom subclass.
+    """
+    return {"type": as_importable_string(type(var)), "value": var.serialize()}

Review comment:
       Possibly `__type` to match with other "special" keys? Not sure

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -87,6 +88,64 @@ def get_operator_extra_links():
     return _OPERATOR_EXTRA_LINKS
 
 
+def encode_relativedelta(var: relativedelta.relativedelta) -> Dict[str, Any]:
+    encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") and v}
+    if var.weekday and var.weekday.n:
+        # Every n'th Friday for example
+        encoded['weekday'] = [var.weekday.weekday, var.weekday.n]
+    elif var.weekday:
+        encoded['weekday'] = [var.weekday.weekday]
+    return encoded
+
+
+def decode_relativedelta(var: Dict[str, Any]) -> relativedelta.relativedelta:
+    if 'weekday' in var:
+        var['weekday'] = relativedelta.weekday(*var['weekday'])  # type: ignore
+    return relativedelta.relativedelta(**var)
+
+
+def encode_timezone(var: Timezone) -> Union[str, int]:
+    """Encode a Pendulum Timezone for serialization.
+
+    Airflow only supports timezone objects that implements Pendulum's Timezone
+    interface. We try to keep as much information as possible to make conversion
+    round-tripping possible (see ``decode_timezone``). We need to special-case
+    UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+    0 without the special case), but passing 0 into ``pendulum.timezone`` does
+    not give us UTC (but ``+00:00``).
+    """
+    if isinstance(var, FixedTimezone):
+        if var.offset == 0:
+            return "UTC"
+        return var.offset
+    if isinstance(var, Timezone):
+        return var.name
+    raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}")
+
+
+def decode_timezone(var: Union[str, int]) -> Timezone:
+    """Decode a previously serialized Pendulum Timezone."""
+    return pendulum.timezone(var)
+
+
+def encode_timetable(var: Timetable) -> Dict[str, Any]:
+    """Encode a timetable instance.
+
+    This delegates most of the serialization work to the type, so the behavior
+    can be completely controlled by a custom subclass.
+    """
+    return {"type": as_importable_string(type(var)), "value": var.serialize()}

Review comment:
       👍🏻 We can tackle that next then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r695663597



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})

Review comment:
       Only reason is for typing; `func(arg=object())` is impossible to type because any `Union` with `object` ends up being just `object`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698711237



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -87,6 +88,64 @@ def get_operator_extra_links():
     return _OPERATOR_EXTRA_LINKS
 
 
+def encode_relativedelta(var: relativedelta.relativedelta) -> Dict[str, Any]:
+    encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") and v}
+    if var.weekday and var.weekday.n:
+        # Every n'th Friday for example
+        encoded['weekday'] = [var.weekday.weekday, var.weekday.n]
+    elif var.weekday:
+        encoded['weekday'] = [var.weekday.weekday]
+    return encoded
+
+
+def decode_relativedelta(var: Dict[str, Any]) -> relativedelta.relativedelta:
+    if 'weekday' in var:
+        var['weekday'] = relativedelta.weekday(*var['weekday'])  # type: ignore
+    return relativedelta.relativedelta(**var)
+
+
+def encode_timezone(var: Timezone) -> Union[str, int]:
+    """Encode a Pendulum Timezone for serialization.
+
+    Airflow only supports timezone objects that implements Pendulum's Timezone
+    interface. We try to keep as much information as possible to make conversion
+    round-tripping possible (see ``decode_timezone``). We need to special-case
+    UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+    0 without the special case), but passing 0 into ``pendulum.timezone`` does
+    not give us UTC (but ``+00:00``).
+    """
+    if isinstance(var, FixedTimezone):
+        if var.offset == 0:
+            return "UTC"
+        return var.offset
+    if isinstance(var, Timezone):
+        return var.name
+    raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}")
+
+
+def decode_timezone(var: Union[str, int]) -> Timezone:
+    """Decode a previously serialized Pendulum Timezone."""
+    return pendulum.timezone(var)
+
+
+def encode_timetable(var: Timetable) -> Dict[str, Any]:
+    """Encode a timetable instance.
+
+    This delegates most of the serialization work to the type, so the behavior
+    can be completely controlled by a custom subclass.
+    """
+    return {"type": as_importable_string(type(var)), "value": var.serialize()}

Review comment:
       👍🏻 We can tackle that next then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-909005911


   Registration issue opened as #17931.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r695579923



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})

Review comment:
       Is there a reason this isn't just `object()`?
   
   Does this display better somewhere?

##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})
+
 DagStateChangeCallback = Callable[[Context], None]
+ScheduleInterval = Union[str, timedelta, relativedelta]
+ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]
+
+
+# Backward compatibility: If neither schedule_interval nor timetable is
+# *provided by the user*, default to a one-day interval.
+DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)

Review comment:
       Wasn't the default before `@daily` which is subtly different to 1day.
   
   (More than possible in wrong)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr closed pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr closed pull request #17414:
URL: https://github.com/apache/airflow/pull/17414


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-895919125


   OK I think this is good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-894647171


   Removing `full tests needed` for now so this doesn’t cost too much unnecessary CI resources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-895834636


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r698663925



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -87,6 +88,64 @@ def get_operator_extra_links():
     return _OPERATOR_EXTRA_LINKS
 
 
+def encode_relativedelta(var: relativedelta.relativedelta) -> Dict[str, Any]:
+    encoded = {k: v for k, v in var.__dict__.items() if not k.startswith("_") and v}
+    if var.weekday and var.weekday.n:
+        # Every n'th Friday for example
+        encoded['weekday'] = [var.weekday.weekday, var.weekday.n]
+    elif var.weekday:
+        encoded['weekday'] = [var.weekday.weekday]
+    return encoded
+
+
+def decode_relativedelta(var: Dict[str, Any]) -> relativedelta.relativedelta:
+    if 'weekday' in var:
+        var['weekday'] = relativedelta.weekday(*var['weekday'])  # type: ignore
+    return relativedelta.relativedelta(**var)
+
+
+def encode_timezone(var: Timezone) -> Union[str, int]:
+    """Encode a Pendulum Timezone for serialization.
+
+    Airflow only supports timezone objects that implements Pendulum's Timezone
+    interface. We try to keep as much information as possible to make conversion
+    round-tripping possible (see ``decode_timezone``). We need to special-case
+    UTC; Pendulum implements it as a FixedTimezone (i.e. it gets encoded as
+    0 without the special case), but passing 0 into ``pendulum.timezone`` does
+    not give us UTC (but ``+00:00``).
+    """
+    if isinstance(var, FixedTimezone):
+        if var.offset == 0:
+            return "UTC"
+        return var.offset
+    if isinstance(var, Timezone):
+        return var.name
+    raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}")
+
+
+def decode_timezone(var: Union[str, int]) -> Timezone:
+    """Decode a previously serialized Pendulum Timezone."""
+    return pendulum.timezone(var)
+
+
+def encode_timetable(var: Timetable) -> Dict[str, Any]:
+    """Encode a timetable instance.
+
+    This delegates most of the serialization work to the type, so the behavior
+    can be completely controlled by a custom subclass.
+    """
+    return {"type": as_importable_string(type(var)), "value": var.serialize()}

Review comment:
       Possibly `__type` to match with other "special" keys? Not sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-909005911


   Registration issue opened as @17931.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#discussion_r695820908



##########
File path: airflow/models/dag.py
##########
@@ -92,11 +91,34 @@
 
 log = logging.getLogger(__name__)
 
-ScheduleInterval = Union[str, timedelta, relativedelta]
 DEFAULT_VIEW_PRESETS = ['tree', 'graph', 'duration', 'gantt', 'landing_times']
 ORIENTATION_PRESETS = ['LR', 'TB', 'RL', 'BT']
 
+ScheduleIntervalArgNotSet = type("ScheduleIntervalArgNotSet", (), {})
+
 DagStateChangeCallback = Callable[[Context], None]
+ScheduleInterval = Union[str, timedelta, relativedelta]
+ScheduleIntervalArg = Union[ScheduleInterval, None, Type[ScheduleIntervalArgNotSet]]
+
+
+# Backward compatibility: If neither schedule_interval nor timetable is
+# *provided by the user*, default to a one-day interval.
+DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)

Review comment:
       Cool. I'm just misremembering then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr edited a comment on pull request #17414: Allow custom timetable as a DAG argument

Posted by GitBox <gi...@apache.org>.
uranusjr edited a comment on pull request #17414:
URL: https://github.com/apache/airflow/pull/17414#issuecomment-909005911


   Registration issue opened as #17931.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org