You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/26 17:25:17 UTC

[GitHub] [airflow] ecerulm opened a new pull request #16678: WIP: Solver timezone serialization by adding a schedule_timezone parameter to DAG

ecerulm opened a new pull request #16678:
URL: https://github.com/apache/airflow/pull/16678


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   closes: #16613
   related: #16631
   
   This allows to specify a pendulum timezone for the DAG which is used to qualify the `schedule_interval`. 
   
   Currently the dag timezone is inferred from the dag's `start_date` but that runs into the problem that in that case the timeone is a `datetime.tzinfo` which is not really serializable. Up till now airflow was relying in trying to get the tzinfo's name and serializing that, hoping that you could get an equivalent tzinfo via `pendulum.timezone(serialized_name)`.
   
   The idea behind this PR is to introduce a more direct way to specify the timezone for the DAG, and require it to be a pendulum's timezone name (IANA timezone strings) or an offset in seconds. That is more reliable than trying to support serialization for generic `datetime.tzinfo` which runs into a lot of corner cases. 
   
   Having said that this PR also still tries to infer a `schedule_timezone` string from the `start_date` if not directly supplied:
   * use `start_date.tzinfo.name`  if exist and it's recognized by pendulum as valid timezone identifier
   * use `start_date.tzinfo.tzname(...)` if it's recognized by pendulum as valid timezone identifier
   * use the `start_date.tzinfo.utcoffset(...)` 
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-872818158


   Yeah, `tzinfo.tzname()` is not suitable for our serialisation use since it makes zero guarantee about the result’s decodability or round-trip property. The possible fix to this would be pretty wide in scope (we’ll probably need an tzinfo implementation interoperability standard) and let’s not go there for now…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660095871



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -135,7 +136,7 @@
                 'label': 'custom_task',
             },
         ],
-        "timezone": "UTC",
+        "schedule_timezone": "UTC",

Review comment:
       Ok, I managed to keep as `timezone` in the serialized object, but this getting more and more complex , and more hacky that the alternative #16631. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660465772



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -135,7 +136,7 @@
                 'label': 'custom_task',
             },
         ],
-        "timezone": "UTC",
+        "schedule_timezone": "UTC",

Review comment:
       If we can find a to not have to do `return pendulum.parse("2021-01-01T12:00:00" + tz_name).tzinfo` I'd be happy with that approach, but with that it just feels too hacky I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659856584



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       But anyway, I'm removing `schedule_timezone` from the PR, right? Because that changes the public api of the DAG and that requires an AIP, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ecerulm commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-872756457


   > Oh, do we actually need it to be a pendulum.TZ, or do we already "support" that and the build in dt.zoneinfo?
   
   There is no guarantee that he zoneinfo tzname() will return something that can be fed to `pendulum.timezone(xx)` 
   
   ```
   from zoneinfo import ZoneInfo
   from datetime import datetime,timedelta
   import pendulum
   dt = datetime(2020, 10, 31, 12, tzinfo=ZoneInfo("America/Los_Angeles"))
   print(dt.tzname()) # PDT
   pendulum.timezone(dt.tzname()) # InvalidTimezone "PDT"
   ```
   
   So for example `ZoneInfo("America/Los_Angeles")` will be interpreted as an integer offset in the current status of this PR. 
   
   If we wanted to support `ZoneInfo` too we would need to check for `.key` attribute too (in addition to `.name`) and hopefully they both have the same IANA names supported so `pendulum.timezone(zinfoobj.key)` will provide an equivalent pendulum timezone to the specified zoneinfo. 
   
   But it's not better to just say that it has to be a pendulum timezone, since at the end at deserializaiton time it will be reconstructed as a pendulum? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659805268



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       +1 on checking `start_date` is either naive (and will be converted to `schedule_timezone`) or matches `schedule_timezone`. Maybe `end_date` as well, although we don’t currently do? (Which is still a footgun.)

##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       > There is no problem to specify the `start_date` in one timezone, the `end_date` in another timezone, and have yet another timezone for the `schedule_timezone`. I can see people (myself included) using iso 8601 string like `2021-06-01T12:00:00+01:00` for `start_date` and `schedule_timezone='Europe/Stockholm'`
   
   Hmm, that makes sense. And since if `schedule_timezone` is _not_ set, `start_date.tzinfo` will be used anyway, there is no behavioural change for people not (yet) modifying their DAGs. And when `schedule_timezone` is set, the user should know what they’re doing.
   
   > But anyway, I'm removing `schedule_timezone` from the PR, right? Because that changes the public api of the DAG and that requires an AIP, right?
   
   I think so, yes, but we’re going to need to have this discussion when explicit per-DAG timezone setup becomes a thing (whether via `schedule_timezone` or another mean) anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm closed pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ecerulm closed pull request #16678:
URL: https://github.com/apache/airflow/pull/16678


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on pull request #16678: WIP: Solver timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-869033342


   There is another PR (also my me) that tries to solve the problem #16631 more at the deserialization time. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660624826



##########
File path: airflow/models/dag.py
##########
@@ -304,14 +339,15 @@ def __init__(
 
         # set timezone from start_date
         if start_date and start_date.tzinfo:
-            self.timezone = start_date.tzinfo
+            self._schedule_timezone = _get_tzname_or_offset(start_date.tzinfo)
         elif 'start_date' in self.default_args and self.default_args['start_date']:
             if isinstance(self.default_args['start_date'], str):
                 self.default_args['start_date'] = timezone.parse(self.default_args['start_date'])
-            self.timezone = self.default_args['start_date'].tzinfo
+            if self.default_args['start_date'] and self.default_args['start_date'].tzinfo:
+                self._schedule_timezone = _get_tzname_or_offset(self.default_args['start_date'].tzinfo)
 
-        if not hasattr(self, 'timezone') or not self.timezone:
-            self.timezone = settings.TIMEZONE
+        if not hasattr(self, '_schedule_timezone'):
+            self._schedule_timezone = _get_tzname_or_offset(settings.TIMEZONE)

Review comment:
       sure

##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)
         if start_date and start_date.tzinfo:
-            self.timezone = start_date.tzinfo
+            self.schedule_timezone = self._get_tzname_or_offset(start_date.tzinfo)
+            # self.timezone = start_date.tzinfo
         elif 'start_date' in self.default_args and self.default_args['start_date']:
             if isinstance(self.default_args['start_date'], str):
                 self.default_args['start_date'] = timezone.parse(self.default_args['start_date'])
-            self.timezone = self.default_args['start_date'].tzinfo
+            if self.default_args['start_date'] and self.default_args['start_date'].tzinfo:
+                self.schedule_timezone = self._get_tzname_or_offset(self.default_args['start_date'].tzinfo)
 
-        if not hasattr(self, 'timezone') or not self.timezone:
-            self.timezone = settings.TIMEZONE
+        if not hasattr(self, 'schedule_timezone'):
+            self.schedule_timezone = self._get_tzname_or_offset(settings.TIMEZONE)

Review comment:
       I left dag.py untouched now

##########
File path: airflow/utils/timezone.py
##########
@@ -184,3 +185,38 @@ def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]
     if v.tzinfo is None:
         v = make_aware(v)
     return pendulum.instance(v)
+
+
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(dt.datetime.now(dt.timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(dt.datetime.now(dt.timezone.utc)).total_seconds())

Review comment:
       @ashb do we want to do this or not? at this point there is no real guarantee that the `datetime.tzinfo` that is really a fixed offset timezone. There is no good way to check if it really is. I can do an `isinstance(obj, pendulum.tz.timezone.FixedTimezone)` that works only for pendulum timezones.
   
    The risk is that someone uses some custom `datetime.tzinfo` with a custom name with no equivalent in pendulum, and this serialization logic will reduce that custom tzinfo to a fixedoffset timezone (which will have a different behaviour than the original dag.timezone). 

##########
File path: airflow/utils/timezone.py
##########
@@ -184,3 +185,38 @@ def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]
     if v.tzinfo is None:
         v = make_aware(v)
     return pendulum.instance(v)
+
+
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(dt.datetime.now(dt.timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(dt.datetime.now(dt.timezone.utc)).total_seconds())

Review comment:
       @ashb , @uranusjr do we want to do this or not? at this point there is no real guarantee that the `datetime.tzinfo` that is really a fixed offset timezone. There is no good way to check if it really is. I can do an `isinstance(obj, pendulum.tz.timezone.FixedTimezone)` that works only for pendulum timezones.
   
    The risk is that someone uses some custom `datetime.tzinfo` with a custom name with no equivalent in pendulum, and this serialization logic will reduce that custom tzinfo to a fixedoffset timezone (which will have a different behaviour than the original dag.timezone). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-870730096


   Nothing more from me for now, but one thing for the future:
   
   > > The risk is that someone uses some custom datetime.tzinfo with a custom name with no equivalent in pendulum
   > 
   > I think so long as we document "don't do this" I'm okay wit this. The chance of someone creating a custom timezone seems _very_ slim given Airflow's typical users.
   
   We’ll probably need to revisit this when 3.10 is out (and supported by Airflow) since that will become the most obvious tzinfo implementation instead of `pendulum.Timzeon`. A follow-up issue tracking `zoneinfo.Zoneinfo` compatibility (and potentially an eventual switch to it from pendulum) would be nice to have.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660466253



##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def _get_tzname_or_offset(obj):

Review comment:
       Should live in airflow.utils.timezone

##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(datetime.now(timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+    if isinstance(name, str) or isinstance(name, int):

Review comment:
       ```suggestion
       if isinstance(name, (str, int)):
   ```

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -712,7 +714,8 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
                 v = {task["task_id"]: SerializedBaseOperator.deserialize_operator(task) for task in v}
                 k = "task_dict"
             elif k == "timezone":
-                v = cls._deserialize_timezone(v)
+                dag._schedule_timezone = v

Review comment:
       Not needed if we keep `dag.timezone` as an attribute.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,7 +186,9 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
+            if key == 'timezone':
+                serialized_object[key] = cls._serialize(object_to_serialize._schedule_timezone)
+            elif key in decorated_fields:

Review comment:
       We shouldn't need this here, handle it in `def _serialize` instead.

##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(datetime.now(timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+    if isinstance(name, str) or isinstance(name, int):
+        try:
+            return pendulum.timezone(name)

Review comment:
       ```suggestion
               pendulum.timezone(name)
               return True
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2209,7 +2249,7 @@ def get_num_task_instances(dag_id, task_ids=None, states=None, session=None):
     def get_serialized_fields(cls):
         """Stringified DAGs and operators contain exactly these fields."""
         if not cls.__serialized_fields:
-            cls.__serialized_fields = frozenset(vars(DAG(dag_id='test')).keys()) - {
+            cls.__serialized_fields = frozenset(set(vars(DAG(dag_id='test')).keys())) - {

Review comment:
       Unnecessary change if we keep time timezone as an actual property.

##########
File path: airflow/models/dag.py
##########
@@ -304,14 +339,15 @@ def __init__(
 
         # set timezone from start_date
         if start_date and start_date.tzinfo:
-            self.timezone = start_date.tzinfo
+            self._schedule_timezone = _get_tzname_or_offset(start_date.tzinfo)
         elif 'start_date' in self.default_args and self.default_args['start_date']:
             if isinstance(self.default_args['start_date'], str):
                 self.default_args['start_date'] = timezone.parse(self.default_args['start_date'])
-            self.timezone = self.default_args['start_date'].tzinfo
+            if self.default_args['start_date'] and self.default_args['start_date'].tzinfo:
+                self._schedule_timezone = _get_tzname_or_offset(self.default_args['start_date'].tzinfo)
 
-        if not hasattr(self, 'timezone') or not self.timezone:
-            self.timezone = settings.TIMEZONE
+        if not hasattr(self, '_schedule_timezone'):
+            self._schedule_timezone = _get_tzname_or_offset(settings.TIMEZONE)

Review comment:
       Does any of this actualy need changing?
   
   Can't we keep `self.timezone` as an actual timezone object?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r664383462



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -284,7 +286,7 @@ def _deserialize(cls, encoded_var: Any) -> Any:
         elif type_ == DAT.TIMEDELTA:
             return datetime.timedelta(seconds=var)
         elif type_ == DAT.TIMEZONE:
-            return Timezone(var)
+            return pendulum.timezone(var)

Review comment:
       For consistency with _deserilalized_dag, should this be
   
   ```suggestion
               return self._deserialize_timezone(var)
   ```
   
   (I think the functionality is identical.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659895589



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       > There is no problem to specify the `start_date` in one timezone, the `end_date` in another timezone, and have yet another timezone for the `schedule_timezone`. I can see people (myself included) using iso 8601 string like `2021-06-01T12:00:00+01:00` for `start_date` and `schedule_timezone='Europe/Stockholm'`
   
   Hmm, that makes sense. And since if `schedule_timezone` is _not_ set, `start_date.tzinfo` will be used anyway, there is no behavioural change for people not (yet) modifying their DAGs. And when `schedule_timezone` is set, the user should know what they’re doing.
   
   > But anyway, I'm removing `schedule_timezone` from the PR, right? Because that changes the public api of the DAG and that requires an AIP, right?
   
   I think so, yes, but we’re going to need to have this discussion when explicit per-DAG timezone setup becomes a thing (whether via `schedule_timezone` or another mean) anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-870025210


   So I removed the `schedule_timezone` argument and rename  the `schedule_timezone ` attribute to `_schedule_timezone`(private) as both things were chaing the public api of the DAG and an AIP it's needed to modify the AIP.
   
   @potiuk, @ashb  what is required to write an AIP? I don't have the permissions to do it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659805268



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       +1 on checking `start_date` is either naive (and will be converted to `schedule_timezone`) or matches `schedule_timezone`. Maybe `end_date` as well, although we don’t currently do? (Which is still a footgun.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659728583



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -135,7 +136,7 @@
                 'label': 'custom_task',
             },
         ],
-        "timezone": "UTC",
+        "schedule_timezone": "UTC",

Review comment:
       Okay, so now we come to an interesting problem that we, so far, have not had to deal with.
   
   We need to be able to continue to parse "old" serialized versions, so this might be easiest to _not_ change the field name, and in the serialized representation keep it called `timezone`, but just upgrade the type to allow it to become int or string. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659855128



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       I don't think we really want to check that the `start_date` time zone matches the `schedule_timezone`.  That's why I wanted to introduce `schedule_timezone` to avoid having to rely on inferring things from `start_date.tzinfo`.
   
   There is no problem to specify the `start_date` in one timezone, the `end_date` in another timezone, and have yet another timezone for the `schedule_timezone`.  I can see people (myself included) using  iso 8601 string like `2021-06-01T12:00:00+01:00` for `start_date` and `schedule_timezone='Europe/Stockholm'` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659282284



##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
 
     # /Context Manager ----------------------------------------------
 
+    def _get_tzname_or_offset(self, obj):

Review comment:
       This function (and `_is_valid_pendulum_tz_id`) does not use anything on the DAG (i.e. `self`) and should be free functions instead. Maybe put them at the top of the module or in `airflow.utils.dates`?

##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)
         if start_date and start_date.tzinfo:
-            self.timezone = start_date.tzinfo
+            self.schedule_timezone = self._get_tzname_or_offset(start_date.tzinfo)
+            # self.timezone = start_date.tzinfo
         elif 'start_date' in self.default_args and self.default_args['start_date']:
             if isinstance(self.default_args['start_date'], str):
                 self.default_args['start_date'] = timezone.parse(self.default_args['start_date'])
-            self.timezone = self.default_args['start_date'].tzinfo
+            if self.default_args['start_date'] and self.default_args['start_date'].tzinfo:
+                self.schedule_timezone = self._get_tzname_or_offset(self.default_args['start_date'].tzinfo)
 
-        if not hasattr(self, 'timezone') or not self.timezone:
-            self.timezone = settings.TIMEZONE
+        if not hasattr(self, 'schedule_timezone'):
+            self.schedule_timezone = self._get_tzname_or_offset(settings.TIMEZONE)

Review comment:
       This `hasattr` call is super weird. This is the initialiser, we should have a way to know whether `self.schedule_timezone` is set or not without introspection. (Granted the existing code already does the same, but that does not make it less awkward.)

##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
 
     # /Context Manager ----------------------------------------------
 
+    def _get_tzname_or_offset(self, obj):
+        if self._is_valid_pendulum_tz_id(obj):
+            return obj
+        if hasattr(obj, 'name'):
+            candidate = obj.name
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        try:
+            candidate = obj.tzname(datetime.now(timezone.utc))
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        except (NotImplementedError, ValueError):
+            pass
+
+        try:
+            candidate = int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        except (NotImplementedError, ValueError):
+            pass
+
+        raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+    def _is_valid_pendulum_tz_id(self, id):

Review comment:
       Dom’t name a variable `id` (linters will be unhappy). “Tz id” is also a bit confusing to me (that value does not actually identify anything—id is short for identifier). Maybe call it `name` or something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-872291623


   Oh, do we actually need it to be a pendulum.TZ, or do we already "support" that and the build in dt.zoneinfo?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659726123



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       Should we do any checks to make sure that the start_date is in the right timezone?
   
   (Just trying to think about potential foot-guns that might surprise users.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660662262



##########
File path: airflow/utils/timezone.py
##########
@@ -184,3 +185,38 @@ def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]
     if v.tzinfo is None:
         v = make_aware(v)
     return pendulum.instance(v)
+
+
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(dt.datetime.now(dt.timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(dt.datetime.now(dt.timezone.utc)).total_seconds())

Review comment:
       Ok, I added a "Don't use other `datetime.tzinfo` implementations" caveat on that page (timezone.rst)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660465772



##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -135,7 +136,7 @@
                 'label': 'custom_task',
             },
         ],
-        "timezone": "UTC",
+        "schedule_timezone": "UTC",

Review comment:
       If we can find a to not have to do `return pendulum.parse("2021-01-01T12:00:00" + tz_name).tzinfo` I'd be happy with that approach, but with that it just feels too hacky I think.

##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def _get_tzname_or_offset(obj):

Review comment:
       Should live in airflow.utils.timezone

##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(datetime.now(timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+    if isinstance(name, str) or isinstance(name, int):

Review comment:
       ```suggestion
       if isinstance(name, (str, int)):
   ```

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -712,7 +714,8 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
                 v = {task["task_id"]: SerializedBaseOperator.deserialize_operator(task) for task in v}
                 k = "task_dict"
             elif k == "timezone":
-                v = cls._deserialize_timezone(v)
+                dag._schedule_timezone = v

Review comment:
       Not needed if we keep `dag.timezone` as an attribute.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -186,7 +186,9 @@ def serialize_to_json(
             if cls._is_excluded(value, key, object_to_serialize):
                 continue
 
-            if key in decorated_fields:
+            if key == 'timezone':
+                serialized_object[key] = cls._serialize(object_to_serialize._schedule_timezone)
+            elif key in decorated_fields:

Review comment:
       We shouldn't need this here, handle it in `def _serialize` instead.

##########
File path: airflow/models/dag.py
##########
@@ -107,6 +108,40 @@ def get_last_dagrun(dag_id, session, include_externally_triggered=False):
     return query.first()
 
 
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(datetime.now(timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+
+def _is_valid_pendulum_tzname(name):
+    if isinstance(name, str) or isinstance(name, int):
+        try:
+            return pendulum.timezone(name)

Review comment:
       ```suggestion
               pendulum.timezone(name)
               return True
   ```

##########
File path: airflow/models/dag.py
##########
@@ -2209,7 +2249,7 @@ def get_num_task_instances(dag_id, task_ids=None, states=None, session=None):
     def get_serialized_fields(cls):
         """Stringified DAGs and operators contain exactly these fields."""
         if not cls.__serialized_fields:
-            cls.__serialized_fields = frozenset(vars(DAG(dag_id='test')).keys()) - {
+            cls.__serialized_fields = frozenset(set(vars(DAG(dag_id='test')).keys())) - {

Review comment:
       Unnecessary change if we keep time timezone as an actual property.

##########
File path: airflow/models/dag.py
##########
@@ -304,14 +339,15 @@ def __init__(
 
         # set timezone from start_date
         if start_date and start_date.tzinfo:
-            self.timezone = start_date.tzinfo
+            self._schedule_timezone = _get_tzname_or_offset(start_date.tzinfo)
         elif 'start_date' in self.default_args and self.default_args['start_date']:
             if isinstance(self.default_args['start_date'], str):
                 self.default_args['start_date'] = timezone.parse(self.default_args['start_date'])
-            self.timezone = self.default_args['start_date'].tzinfo
+            if self.default_args['start_date'] and self.default_args['start_date'].tzinfo:
+                self._schedule_timezone = _get_tzname_or_offset(self.default_args['start_date'].tzinfo)
 
-        if not hasattr(self, 'timezone') or not self.timezone:
-            self.timezone = settings.TIMEZONE
+        if not hasattr(self, '_schedule_timezone'):
+            self._schedule_timezone = _get_tzname_or_offset(settings.TIMEZONE)

Review comment:
       Does any of this actualy need changing?
   
   Can't we keep `self.timezone` as an actual timezone object?

##########
File path: airflow/utils/timezone.py
##########
@@ -184,3 +185,38 @@ def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]
     if v.tzinfo is None:
         v = make_aware(v)
     return pendulum.instance(v)
+
+
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(dt.datetime.now(dt.timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(dt.datetime.now(dt.timezone.utc)).total_seconds())

Review comment:
       > The risk is that someone uses some custom datetime.tzinfo with a custom name with no equivalent in pendulum
   
   I think so long as we document "don't do this"  I'm okay wit this. The chance of someone creating a custom timezone seems _very_ slim given Airflow's typical users. 

##########
File path: airflow/utils/timezone.py
##########
@@ -184,3 +185,38 @@ def coerce_datetime(v: Union[None, dt.datetime, DateTime]) -> Optional[DateTime]
     if v.tzinfo is None:
         v = make_aware(v)
     return pendulum.instance(v)
+
+
+def _get_tzname_or_offset(obj):
+    if _is_valid_pendulum_tzname(obj):
+        return obj
+    if hasattr(obj, 'name'):
+        candidate = obj.name
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    try:
+        candidate = obj.tzname(dt.datetime.now(dt.timezone.utc))
+        if _is_valid_pendulum_tzname(candidate):
+            return candidate
+    except (NotImplementedError, ValueError):
+        pass
+
+    try:
+        candidate = int(obj.utcoffset(dt.datetime.now(dt.timezone.utc)).total_seconds())

Review comment:
       https://airflow.apache.org/docs/apache-airflow/stable/timezone.html?highlight=timezone#time-zone-aware-dags would be the place to put such a comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r677371540



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -284,7 +286,7 @@ def _deserialize(cls, encoded_var: Any) -> Any:
         elif type_ == DAT.TIMEDELTA:
             return datetime.timedelta(seconds=var)
         elif type_ == DAT.TIMEZONE:
-            return Timezone(var)
+            return pendulum.timezone(var)

Review comment:
       @ecerulm ^^ can you take a look at this suggestion please and rebase to the latest main at the same time, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660094384



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       Ok, removed `schedule_timezone` from DAG constructor constructor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-870599533


   I moved the functions to the `airflow.utils.timezone` removed all changes from `airflow.models.dag` and keep everything in `airflow.serializaton.serialized_objects` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660078947



##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
 
     # /Context Manager ----------------------------------------------
 
+    def _get_tzname_or_offset(self, obj):

Review comment:
       Ok I moved to the module level




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r664383462



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -284,7 +286,7 @@ def _deserialize(cls, encoded_var: Any) -> Any:
         elif type_ == DAT.TIMEDELTA:
             return datetime.timedelta(seconds=var)
         elif type_ == DAT.TIMEZONE:
-            return Timezone(var)
+            return pendulum.timezone(var)

Review comment:
       For consistency with _deserilalized_dag, should this be
   
   ```suggestion
               return self._deserialize_timezone(var)
   ```
   
   (I think the functionality is identical.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660079266



##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
 
     # /Context Manager ----------------------------------------------
 
+    def _get_tzname_or_offset(self, obj):
+        if self._is_valid_pendulum_tz_id(obj):
+            return obj
+        if hasattr(obj, 'name'):
+            candidate = obj.name
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        try:
+            candidate = obj.tzname(datetime.now(timezone.utc))
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        except (NotImplementedError, ValueError):
+            pass
+
+        try:
+            candidate = int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        except (NotImplementedError, ValueError):
+            pass
+
+        raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+    def _is_valid_pendulum_tz_id(self, id):

Review comment:
       ok renamed it to `is_valid_pendulum_tzname` and `name`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#issuecomment-869208645


   >It’s not clear to me why a new argument is needed to solve this.
   >....
   > and the problem’s solution is the same regardless of it.
   
   well that's exactly the issue, The **partial** solution is the same, but the I believe that the true only solution is to make the user specify the timezone as as a string or int  instead of trying to get it as `datetime.tzinfo` (of pendulum's subclasses).  But, yes, you are right, there is no need to introduce that new argument just to have an equivalent of the  #16631 implementation
   
   I just tried to create an AIP at https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals  and I don't have permissions to do it.
   
   Anyway, I'll just wait to see @ashb  comments as well, because first somebody needs to decide if we want to handle the problem in the DAG.__init__() , so that serialize/deserialize is simpler like in this PR, or handle it at deserialization time like in #16631. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #16678: Serialize dag timezone to a tzname or offset

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r660642129



##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -27,6 +27,8 @@
 import pendulum
 from dateutil import relativedelta
 
+from airflow.utils.timezone import _get_tzname_or_offset

Review comment:
       ```suggestion
   from airflow.utils.timezone import get_tzname_or_offset
   ```
   
   If it's called from outside the module it shouldn't have `_` prefix.

##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -228,7 +230,7 @@ def _serialize(cls, var: Any) -> Any:  # Unfortunately there is no support for r
         elif isinstance(var, datetime.timedelta):
             return cls._encode(var.total_seconds(), type_=DAT.TIMEDELTA)
         elif isinstance(var, Timezone):
-            return cls._encode(str(var.name), type_=DAT.TIMEZONE)
+            return cls._encode(_get_tzname_or_offset(var), type_=DAT.TIMEZONE)

Review comment:
       ```suggestion
               return cls._encode(get_tzname_or_offset(var), type_=DAT.TIMEZONE)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ecerulm commented on a change in pull request #16678: WIP: Solve timezone serialization by adding a schedule_timezone parameter to DAG

Posted by GitBox <gi...@apache.org>.
ecerulm commented on a change in pull request #16678:
URL: https://github.com/apache/airflow/pull/16678#discussion_r659855128



##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       I don't think we really want to check that the `start_date` time zone matches the `schedule_timezone`.  That's why I wanted to introduce `schedule_timezone` to avoid having to rely on inferring things from `start_date.tzinfo`.
   
   There is no problem to specify the `start_date` in one timezone, the `end_date` in another timezone, and have yet another timezone for the `schedule_timezone`.  I can see people (myself included) using  iso 8601 string like `2021-06-01T12:00:00+01:00` for `start_date` and `schedule_timezone='Europe/Stockholm'` 

##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       But anyway, I'm removing `schedule_timezone` from the PR, right? Because that changes the public api of the DAG and that requires an AIP, right?

##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
 
     # /Context Manager ----------------------------------------------
 
+    def _get_tzname_or_offset(self, obj):

Review comment:
       Ok I moved to the module level

##########
File path: airflow/models/dag.py
##########
@@ -422,6 +428,38 @@ def __exit__(self, _type, _value, _tb):
 
     # /Context Manager ----------------------------------------------
 
+    def _get_tzname_or_offset(self, obj):
+        if self._is_valid_pendulum_tz_id(obj):
+            return obj
+        if hasattr(obj, 'name'):
+            candidate = obj.name
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        try:
+            candidate = obj.tzname(datetime.now(timezone.utc))
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        except (NotImplementedError, ValueError):
+            pass
+
+        try:
+            candidate = int(obj.utcoffset(datetime.now(timezone.utc)).total_seconds())
+            if self._is_valid_pendulum_tz_id(candidate):
+                return candidate
+        except (NotImplementedError, ValueError):
+            pass
+
+        raise ValueError(f"Can't get a timezone name or offset from {obj}")
+
+    def _is_valid_pendulum_tz_id(self, id):

Review comment:
       ok renamed it to `is_valid_pendulum_tzname` and `name`

##########
File path: airflow/models/dag.py
##########
@@ -302,16 +304,20 @@ def __init__(
         self.fileloc = back.f_code.co_filename if back else ""
         self.task_dict: Dict[str, BaseOperator] = {}
 
-        # set timezone from start_date
+        # set timezone from schedule_timezone or start_date
+        if schedule_timezone:
+            self.schedule_timezone = self._get_tzname_or_offset(schedule_timezone)

Review comment:
       Ok, removed `schedule_timezone` from DAG constructor constructor

##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -135,7 +136,7 @@
                 'label': 'custom_task',
             },
         ],
-        "timezone": "UTC",
+        "schedule_timezone": "UTC",

Review comment:
       Ok, I managed to keep as `timezone` in the serialized object, but this getting more and more complex , and more hacky that the alternative #16631. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org