You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/29 08:54:27 UTC

[airflow] branch main updated: Fix BranchDateTimeOperator to be timezone-awreness-insensitive (#25944)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 363b768faa Fix BranchDateTimeOperator to be timezone-awreness-insensitive (#25944)
363b768faa is described below

commit 363b768faa8158e8f00fe83af7c792e7d3504ed0
Author: Jarek Potiuk <ja...@polidea.com>
AuthorDate: Mon Aug 29 10:54:19 2022 +0200

    Fix BranchDateTimeOperator to be timezone-awreness-insensitive (#25944)
    
    * fIx BranchDateTimeOperator to be timezone-awreness-insensitive
    
    The BranchDateTimeOperator was sensitive to whether timezone
    aware or timezone noive parameters were passed to it. Actually
    it worked a bit unpredictably - if use_task_logical_date was
    used, the lower/upper ranges were supposed to be timezone aware,
    but when "now()" was used, the ranges were supposed to be timezone
    naive. One of our examples has been broken because it was
    comparing naive and aware datetime.
    
    This PR coerces all values to timezone aware Pendulum datetime using
    the timezone of the Dag, which makes it insensitive to whether the
    aware or naive ranges have been used.
    
    Also, we missed example in the howto showing logical date usage
    (and it was rather strange as logical date is the only reasonable
    usage of the operator - using utcnow() make the DAG essentially
    non-idempotent - it's result depends on when the task is run which
    might make sense in some cases but most of the time is something
    that should be discouraged.
    
    The documentation has been updated to explain that.
    
    * Also use tz when converting non-Pendulum datetime
    
    Plus a better written docstring.
    
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 .../example_branch_datetime_operator.py            | 25 +++++++++++++
 airflow/operators/datetime.py                      |  8 +++--
 airflow/utils/timezone.py                          | 23 +++++++-----
 docs/apache-airflow/howto/operator/datetime.rst    | 41 ++++++++++++++++++++--
 4 files changed, 83 insertions(+), 14 deletions(-)

diff --git a/airflow/example_dags/example_branch_datetime_operator.py b/airflow/example_dags/example_branch_datetime_operator.py
index 1f0e603da3..dcbee0cef1 100644
--- a/airflow/example_dags/example_branch_datetime_operator.py
+++ b/airflow/example_dags/example_branch_datetime_operator.py
@@ -76,3 +76,28 @@ cond2 = BranchDateTimeOperator(
 # Run empty_task_1 if cond2 executes between 15:00:00, and 00:00:00 of the following day
 cond2 >> [empty_task_12, empty_task_22]
 # [END howto_branch_datetime_operator_next_day]
+
+dag3 = DAG(
+    dag_id="example_branch_datetime_operator_3",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    catchup=False,
+    tags=["example"],
+    schedule="@daily",
+)
+# [START howto_branch_datetime_operator_logical_date]
+empty_task_13 = EmptyOperator(task_id='date_in_range', dag=dag3)
+empty_task_23 = EmptyOperator(task_id='date_outside_range', dag=dag3)
+
+cond3 = BranchDateTimeOperator(
+    task_id='datetime_branch',
+    use_task_logical_date=True,
+    follow_task_ids_if_true=['date_in_range'],
+    follow_task_ids_if_false=['date_outside_range'],
+    target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
+    target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
+    dag=dag3,
+)
+
+# Run empty_task_3 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
+cond3 >> [empty_task_13, empty_task_23]
+# [END howto_branch_datetime_operator_logical_date]
diff --git a/airflow/operators/datetime.py b/airflow/operators/datetime.py
index 81e68e6515..6ffa6c7655 100644
--- a/airflow/operators/datetime.py
+++ b/airflow/operators/datetime.py
@@ -77,11 +77,13 @@ class BranchDateTimeOperator(BaseBranchOperator):
 
     def choose_branch(self, context: Context) -> Union[str, Iterable[str]]:
         if self.use_task_logical_date:
-            now = timezone.make_naive(context["logical_date"], self.dag.timezone)
+            now = context["logical_date"]
         else:
-            now = timezone.make_naive(timezone.utcnow(), self.dag.timezone)
-
+            now = timezone.coerce_datetime(timezone.utcnow())
         lower, upper = target_times_as_dates(now, self.target_lower, self.target_upper)
+        lower = timezone.coerce_datetime(lower, self.dag.timezone)
+        upper = timezone.coerce_datetime(upper, self.dag.timezone)
+
         if upper is not None and upper < now:
             return self.follow_task_ids_if_false
 
diff --git a/airflow/utils/timezone.py b/airflow/utils/timezone.py
index 98789adfd7..7129fbcbc6 100644
--- a/airflow/utils/timezone.py
+++ b/airflow/utils/timezone.py
@@ -206,28 +206,35 @@ def parse(string: str, timezone=None) -> DateTime:
 
 
 @overload
-def coerce_datetime(v: None) -> None:
+def coerce_datetime(v: None, tz: Optional[dt.tzinfo] = None) -> None:
     ...
 
 
 @overload
-def coerce_datetime(v: DateTime) -> DateTime:
+def coerce_datetime(v: DateTime, tz: Optional[dt.tzinfo] = None) -> DateTime:
     ...
 
 
 @overload
-def coerce_datetime(v: dt.datetime) -> DateTime:
+def coerce_datetime(v: dt.datetime, tz: Optional[dt.tzinfo] = None) -> DateTime:
     ...
 
 
-def coerce_datetime(v: Optional[dt.datetime]) -> Optional[DateTime]:
-    """Convert whatever is passed in to an timezone-aware ``pendulum.DateTime``."""
+def coerce_datetime(v: Optional[dt.datetime], tz: Optional[dt.tzinfo] = None) -> Optional[DateTime]:
+    """Convert ``v`` into a timezone-aware ``pendulum.DateTime``.
+
+    * If ``v`` is *None*, *None* is returned.
+    * If ``v`` is a naive datetime, it is converted to an aware Pendulum DateTime.
+    * If ``v`` is an aware datetime, it is converted to a Pendulum DateTime.
+      Note that ``tz`` is **not** taken into account in this case; the datetime
+      will maintain its original tzinfo!
+    """
     if v is None:
         return None
     if isinstance(v, DateTime):
-        return v if v.tzinfo else make_aware(v)
-    # Only dt.datetime is left here
-    return pendulum.instance(v if v.tzinfo else make_aware(v))
+        return v if v.tzinfo else make_aware(v, tz)
+    # Only dt.datetime is left here.
+    return pendulum.instance(v if v.tzinfo else make_aware(v, tz))
 
 
 def td_format(td_object: Union[None, dt.timedelta, float, int]) -> Optional[str]:
diff --git a/docs/apache-airflow/howto/operator/datetime.rst b/docs/apache-airflow/howto/operator/datetime.rst
index 37f18b5358..ac8644fe85 100644
--- a/docs/apache-airflow/howto/operator/datetime.rst
+++ b/docs/apache-airflow/howto/operator/datetime.rst
@@ -22,18 +22,53 @@
 BranchDateTimeOperator
 ======================
 
-Use the :class:`~airflow.operators.datetime.BranchDateTimeOperator` to branch into one of two execution paths depending on whether the date and/or time of execution falls into the range given by two target arguments.
+Use the :class:`~airflow.operators.datetime.BranchDateTimeOperator` to branch into one of two execution paths
+depending on whether the time falls into the range given by two target arguments,
+
+This operator has two modes. First mode is to use current time (machine clock time at the
+moment the DAG is executed), and the second mode is to use the ``logical_date`` of the DAG run it is run
+with.
+
+
+Usage with current time
+-----------------------
+
+The usages above might be useful in certain situations - for example when DAG is used to perform cleanups
+and maintenance and is not really supposed to be used for any DAGs that are supposed to be back-filled,
+because the "current time" make back-filling non-idempotent, it's result depend on the time when the DAG
+actually was run. It's also slightly non-deterministic potentially even if it is run on schedule. It can
+take some time between when the DAGRun was scheduled and executed and it might mean that even if
+the DAGRun was scheduled properly, the actual time used for branching decision will be different than the
+schedule time and the branching decision might be different depending on those delays.
 
 .. exampleinclude:: /../../airflow/example_dags/example_branch_datetime_operator.py
     :language: python
     :start-after: [START howto_branch_datetime_operator]
     :end-before: [END howto_branch_datetime_operator]
 
-The target parameters, ``target_upper`` and ``target_lower``, can receive a ``datetime.datetime``, a ``datetime.time``, or ``None``. When a ``datetime.time`` object is used, it will be combined with the current date in order to allow comparisons with it. In the event that ``target_upper`` is set to a ``datetime.time`` that occurs before the given ``target_lower``, a day will be added to ``target_upper``. This is done to allow for time periods that span over two dates.
+The target parameters, ``target_upper`` and ``target_lower``, can receive a ``datetime.datetime``,
+a ``datetime.time``, or ``None``. When a ``datetime.time`` object is used, it will be combined with
+the current date in order to allow comparisons with it. In the event that ``target_upper`` is set
+to a ``datetime.time`` that occurs before the given ``target_lower``, a day will be added to ``target_upper``.
+This is done to allow for time periods that span over two dates.
 
 .. exampleinclude:: /../../airflow/example_dags/example_branch_datetime_operator.py
     :language: python
     :start-after: [START howto_branch_datetime_operator_next_day]
     :end-before: [END howto_branch_datetime_operator_next_day]
 
-If a target parameter is set to ``None``, the operator will perform a unilateral comparison using only the non-``None`` target. Setting both ``target_upper`` and ``target_lower`` to ``None`` will raise an exception.
+If a target parameter is set to ``None``, the operator will perform a unilateral comparison using only
+the non-``None`` target. Setting both ``target_upper`` and ``target_lower`` to ``None``
+will raise an exception.
+
+Usage with logical date
+-----------------------
+
+The usage is much more "data range" friendly. The ``logical_date`` does not change when the DAG is re-run and
+it is not affected by execution delays, so this approach is suitable for idempotent DAG runs that might be
+back-filled.
+
+.. exampleinclude:: /../../airflow/example_dags/example_branch_datetime_operator.py
+    :language: python
+    :start-after: [START howto_branch_datetime_operator_logical_date]
+    :end-before: [END howto_branch_datetime_operator_logical_date]