You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/08/14 23:53:33 UTC

[airflow] 01/04: Add DateTimeSensor (#9697)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit fab690f36a928c8ebaee4e46116649d38c8e9619
Author: zikun <33...@users.noreply.github.com>
AuthorDate: Fri Jul 24 00:53:10 2020 +0800

    Add DateTimeSensor (#9697)
    
    * Add DateTimeSensor
    
    (cherry picked from commit 243b704f47df00365e3421b55d8818fc9c71196f)
---
 airflow/sensors/date_time_sensor.py    | 76 ++++++++++++++++++++++++++++++++++
 tests/sensors/test_date_time_sensor.py | 72 ++++++++++++++++++++++++++++++++
 2 files changed, 148 insertions(+)

diff --git a/airflow/sensors/date_time_sensor.py b/airflow/sensors/date_time_sensor.py
new file mode 100644
index 0000000..a62f3cb8
--- /dev/null
+++ b/airflow/sensors/date_time_sensor.py
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+
+
+class DateTimeSensor(BaseSensorOperator):
+    """
+    Waits until the specified datetime.
+
+    A major advantage of this sensor is idempotence for the ``target_time``.
+    It handles some cases for which ``TimeSensor`` and ``TimeDeltaSensor`` are not suited.
+
+    **Example** 1 :
+        If a task needs to wait for 11am on each ``execution_date``. Using
+        ``TimeSensor`` or ``TimeDeltaSensor``, all backfill tasks started at
+        1am have to wait for 10 hours. This is unnecessary, e.g. a backfill
+        task with ``{{ ds }} = '1970-01-01'`` does not need to wait because
+        ``1970-01-01T11:00:00`` has already passed.
+
+    **Example** 2 :
+        If a DAG is scheduled to run at 23:00 daily, but one of the tasks is
+        required to run at 01:00 next day, using ``TimeSensor`` will return
+        ``True`` immediately because 23:00 > 01:00. Instead, we can do this:
+
+        .. code-block:: python
+
+            DateTimeSensor(
+                task_id='wait_for_0100',
+                target_time='{{ next_execution_date.tomorrow().replace(hour=1) }}',
+            )
+
+    :param target_time: datetime after which the job succeeds. (templated)
+    :type target_time: str or datetime.datetime
+    """
+
+    template_fields = ("target_time",)
+
+    @apply_defaults
+    def __init__(
+        self, target_time, *args, **kwargs
+    ):
+        super(DateTimeSensor, self).__init__(*args, **kwargs)
+        if isinstance(target_time, datetime.datetime):
+            self.target_time = target_time.isoformat()
+        elif isinstance(target_time, str):
+            self.target_time = target_time
+        else:
+            raise TypeError(
+                "Expected str or datetime.datetime type for target_time. Got {}".format(
+                    type(target_time)
+                )
+            )
+
+    def poke(self, context):
+        self.log.info("Checking if the time (%s) has come", self.target_time)
+        return timezone.utcnow() > timezone.parse(self.target_time)
diff --git a/tests/sensors/test_date_time_sensor.py b/tests/sensors/test_date_time_sensor.py
new file mode 100644
index 0000000..b8b81d1
--- /dev/null
+++ b/tests/sensors/test_date_time_sensor.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pytest
+from mock import patch
+from parameterized import parameterized
+
+from airflow.models.dag import DAG
+from airflow.sensors.date_time_sensor import DateTimeSensor
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2015, 1, 1)
+
+
+class TestDateTimeSensor:
+    @classmethod
+    def setup_class(cls):
+        args = {"owner": "airflow", "start_date": DEFAULT_DATE}
+        cls.dag = DAG("test_dag", default_args=args)
+
+    @parameterized.expand(
+        [
+            (
+                "valid_datetime",
+                timezone.datetime(2020, 7, 6, 13, tzinfo=timezone.utc),
+                "2020-07-06T13:00:00+00:00",
+            ),
+            ("valid_str", "20200706T210000+8", "20200706T210000+8"),
+        ]
+    )
+    def test_valid_input(self, task_id, target_time, expected):
+        op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag,)
+        assert op.target_time == expected
+
+    def test_invalid_input(self):
+        with pytest.raises(TypeError):
+            DateTimeSensor(
+                task_id="test", target_time=timezone.utcnow().time(), dag=self.dag,
+            )
+
+    @parameterized.expand(
+        [
+            (
+                "poke_datetime",
+                timezone.datetime(2020, 1, 1, 22, 59, tzinfo=timezone.utc),
+                True,
+            ),
+            ("poke_str_extended", "2020-01-01T23:00:00.001+00:00", False),
+            ("poke_str_basic_with_tz", "20200102T065959+8", True),
+        ]
+    )
+    @patch(
+        "airflow.sensors.date_time_sensor.timezone.utcnow",
+        return_value=timezone.datetime(2020, 1, 1, 23, 0, tzinfo=timezone.utc),
+    )
+    def test_poke(self, task_id, target_time, expected, mock_utcnow):
+        op = DateTimeSensor(task_id=task_id, target_time=target_time, dag=self.dag)
+        assert op.poke(None) == expected