You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/11/25 22:37:54 UTC

[airflow] 05/07: TimeSensor should respect DAG timezone (#9882)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch fix-resource-backcompat
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bbcbb711d20d966fd62beb6f52118eabbe4a27e0
Author: zikun <33...@users.noreply.github.com>
AuthorDate: Tue Jul 21 00:19:08 2020 +0800

    TimeSensor should respect DAG timezone (#9882)
    
    (cherry picked from commit 9c518fe937f8dc5e35908be96bd075c4ff666755)
---
 UPDATING.md                       |  6 ++---
 airflow/sensors/time_sensor.py    |  2 +-
 tests/sensors/test_time_sensor.py | 52 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 56 insertions(+), 4 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index faa75fd..e2285df 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -63,12 +63,12 @@ https://developers.google.com/style/inclusive-documentation
 -->
 ## Airflow 1.10.13
 
-### TimeSensor will consider default_timezone setting.
+### TimeSensor is now timezone aware
 
 Previously `TimeSensor` always compared the `target_time` with the current time in UTC.
 
-Now it will compare `target_time` with the current time in the timezone set by `default_timezone` under the `core`
-section of the config.
+Now it will compare `target_time` with the current time in the timezone of the DAG,
+defaulting to the `default_timezone` in the global config.
 
 ### Removed Kerberos support for HDFS hook
 
diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
index d26c32d..5c41c2c 100644
--- a/airflow/sensors/time_sensor.py
+++ b/airflow/sensors/time_sensor.py
@@ -37,4 +37,4 @@ class TimeSensor(BaseSensorOperator):
 
     def poke(self, context):
         self.log.info('Checking if the time (%s) has come', self.target_time)
-        return timezone.make_naive(timezone.utcnow()).time() > self.target_time
+        return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time
diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py
new file mode 100644
index 0000000..c08bdd1
--- /dev/null
+++ b/tests/sensors/test_time_sensor.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, time
+
+import pendulum
+from parameterized import parameterized
+
+from airflow.models.dag import DAG
+from airflow.sensors.time_sensor import TimeSensor
+from airflow.utils import timezone
+from tests.compat import patch
+
+DEFAULT_TIMEZONE = "Asia/Singapore"  # UTC+08:00
+DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1)
+DEFAULT_DATE_WITH_TZ = datetime(
+    2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)
+)
+
+
+@patch(
+    "airflow.sensors.time_sensor.timezone.utcnow",
+    return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc),
+)
+class TestTimeSensor:
+    @parameterized.expand(
+        [
+            ("UTC", DEFAULT_DATE_WO_TZ, True),
+            ("UTC", DEFAULT_DATE_WITH_TZ, False),
+            (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
+        ]
+    )
+    def test_timezone(self, mock_utcnow, default_timezone, start_date, expected):
+        with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
+            dag = DAG("test", default_args={"start_date": start_date})
+            op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
+            assert op.poke(None) == expected