You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2020/11/25 22:37:54 UTC
[airflow] 05/07: TimeSensor should respect DAG timezone (#9882)
This is an automated email from the ASF dual-hosted git repository.
dimberman pushed a commit to branch fix-resource-backcompat
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bbcbb711d20d966fd62beb6f52118eabbe4a27e0
Author: zikun <33...@users.noreply.github.com>
AuthorDate: Tue Jul 21 00:19:08 2020 +0800
TimeSensor should respect DAG timezone (#9882)
(cherry picked from commit 9c518fe937f8dc5e35908be96bd075c4ff666755)
---
UPDATING.md | 6 ++---
airflow/sensors/time_sensor.py | 2 +-
tests/sensors/test_time_sensor.py | 52 +++++++++++++++++++++++++++++++++++++++
3 files changed, 56 insertions(+), 4 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index faa75fd..e2285df 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -63,12 +63,12 @@ https://developers.google.com/style/inclusive-documentation
-->
## Airflow 1.10.13
-### TimeSensor will consider default_timezone setting.
+### TimeSensor is now timezone aware
Previously `TimeSensor` always compared the `target_time` with the current time in UTC.
-Now it will compare `target_time` with the current time in the timezone set by `default_timezone` under the `core`
-section of the config.
+Now it will compare `target_time` with the current time in the timezone of the DAG,
+defaulting to the `default_timezone` in the global config.
### Removed Kerberos support for HDFS hook
diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
index d26c32d..5c41c2c 100644
--- a/airflow/sensors/time_sensor.py
+++ b/airflow/sensors/time_sensor.py
@@ -37,4 +37,4 @@ class TimeSensor(BaseSensorOperator):
def poke(self, context):
self.log.info('Checking if the time (%s) has come', self.target_time)
- return timezone.make_naive(timezone.utcnow()).time() > self.target_time
+ return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time
diff --git a/tests/sensors/test_time_sensor.py b/tests/sensors/test_time_sensor.py
new file mode 100644
index 0000000..c08bdd1
--- /dev/null
+++ b/tests/sensors/test_time_sensor.py
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, time
+
+import pendulum
+from parameterized import parameterized
+
+from airflow.models.dag import DAG
+from airflow.sensors.time_sensor import TimeSensor
+from airflow.utils import timezone
+from tests.compat import patch
+
+DEFAULT_TIMEZONE = "Asia/Singapore" # UTC+08:00
+DEFAULT_DATE_WO_TZ = datetime(2015, 1, 1)
+DEFAULT_DATE_WITH_TZ = datetime(
+ 2015, 1, 1, tzinfo=pendulum.tz.timezone(DEFAULT_TIMEZONE)
+)
+
+
+@patch(
+ "airflow.sensors.time_sensor.timezone.utcnow",
+ return_value=timezone.datetime(2020, 1, 1, 23, 0).replace(tzinfo=timezone.utc),
+)
+class TestTimeSensor:
+ @parameterized.expand(
+ [
+ ("UTC", DEFAULT_DATE_WO_TZ, True),
+ ("UTC", DEFAULT_DATE_WITH_TZ, False),
+ (DEFAULT_TIMEZONE, DEFAULT_DATE_WO_TZ, False),
+ ]
+ )
+ def test_timezone(self, mock_utcnow, default_timezone, start_date, expected):
+ with patch("airflow.settings.TIMEZONE", pendulum.timezone(default_timezone)):
+ dag = DAG("test", default_args={"start_date": start_date})
+ op = TimeSensor(task_id="test", target_time=time(10, 0), dag=dag)
+ assert op.poke(None) == expected