You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/08/06 20:23:19 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #15389: AIP-40: Deferrable Tasks

jedcunningham commented on a change in pull request #15389:
URL: https://github.com/apache/airflow/pull/15389#discussion_r684460751



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1207,3 +1218,26 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = None):
                     raise
 
         return len(to_reset)
+
+    @provide_session
+    def check_trigger_timeouts(self, session: Session = None):
+        """
+        Looks at all tasks that are in the "deferred" state and whose trigger
+        or execution timeout has passed, so they can be marked as failed.
+        """
+        num_timed_out_tasks = (
+            session.query(TaskInstance)
+            .filter(TaskInstance.state == State.DEFERRED, TaskInstance.trigger_timeout < timezone.utcnow())
+            .update(
+                # We have to schedule these to fail themselves so it doesn't
+                # happen inside the scheduler.
+                {
+                    "state": State.SCHEDULED,
+                    "next_method": "__fail__",
+                    "next_kwargs": {"error": "Trigger/execution timeout"},
+                    "trigger_id": None,
+                }
+            )
+        )
+        if num_timed_out_tasks:
+            self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

Review comment:
       Metrics in general might be worthy of a followup, but if/when that happens this would be a good candidate.

##########
File path: airflow/triggers/__init__.py
##########
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.triggers.base import BaseTrigger  # noqa
+from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger  # noqa

Review comment:
       I don't think we should import these here. In 2.0, this was removed for operators and sensors, so this feels odd to me.

##########
File path: airflow/jobs/triggerer_job.py
##########
@@ -0,0 +1,379 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import os
+import signal
+import sys
+import threading
+import time
+from collections import deque
+from typing import Deque, Dict, Set, Tuple, Type
+
+from airflow.compat.asyncio import create_task
+from airflow.jobs.base_job import BaseJob
+from airflow.models.trigger import Trigger
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.typing_compat import TypedDict
+from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.module_loading import import_string
+
+
+class TriggererJob(BaseJob):
+    """
+    TriggererJob continuously runs active triggers in asyncio, watching
+    for them to fire off their events and then dispatching that information
+    to their dependent tasks/DAGs.
+
+    It runs as two threads:
+     - The main thread does DB calls/checkins
+     - A subthread runs all the async code
+    """
+
+    __mapper_args__ = {'polymorphic_identity': 'TriggererJob'}
+
+    def __init__(self, capacity=None, *args, **kwargs):
+        # Call superclass
+        super().__init__(*args, **kwargs)
+
+        if capacity is None:
+            self.capacity = 1000  # TODO put this in a config file?

Review comment:
       Yeah, probably should be moved to the config, like the timeout interval.

##########
File path: airflow/triggers/testing.py
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, Tuple
+
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class SuccessTrigger(BaseTrigger):
+    """
+    A trigger that always succeeds immediately.
+
+    Should only be used for testing.

Review comment:
       (Should these live in `tests` then?)

##########
File path: tests/jobs/test_triggerer_job.py
##########
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import sys
+import time
+
+import pytest
+
+from airflow import DAG
+from airflow.jobs.triggerer_job import TriggererJob
+from airflow.models import Trigger
+from airflow.models.taskinstance import TaskInstance
+from airflow.operators.dummy import DummyOperator
+from airflow.triggers.base import TriggerEvent
+from airflow.triggers.temporal import TimeDeltaTrigger
+from airflow.triggers.testing import FailureTrigger, SuccessTrigger
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State, TaskInstanceState
+from tests.test_utils.db import clear_db_runs
+
+
+@pytest.fixture(autouse=True)
+def clean_database():
+    """Fixture that cleans the database before and after every test."""
+    clear_db_runs()
+    yield  # Test runs here
+    clear_db_runs()
+
+
+@pytest.fixture
+def session():
+    """Fixture that provides a SQLAlchemy session"""
+    with create_session() as session:
+        yield session
+
+
+@pytest.mark.skipif(sys.version_info.minor <= 6 and sys.version_info.major <= 3, reason="No triggerer on 3.6")
+def test_is_alive():
+    """Checks the heartbeat logic"""
+    # Current time
+    triggerer_job = TriggererJob(None, heartrate=10, state=State.RUNNING)
+    assert triggerer_job.is_alive()
+
+    # Slightly old, but still fresh
+    triggerer_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
+    assert triggerer_job.is_alive()
+
+    # Old enough to fail
+    triggerer_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
+    assert not triggerer_job.is_alive()
+
+    # Completed state should not be alive
+    triggerer_job.state = State.SUCCESS
+    triggerer_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
+    assert not triggerer_job.is_alive(), "Completed jobs even with recent heartbeat should not be alive"
+
+
+@pytest.mark.skipif(sys.version_info.minor <= 6 and sys.version_info.major <= 3, reason="No triggerer on 3.6")
+def test_capacity_decode():
+    """
+    Tests that TriggererJob correctly sets capacity to a valid value passed in as a CLI arg,
+    handles invalid args, or sets it to a default value if no arg is passed.
+    """
+    # Positive cases
+    variants = [
+        42,
+        None,
+    ]
+    for input_str in variants:
+

Review comment:
       ```suggestion
   
   ```
   nit

##########
File path: airflow/triggers/temporal.py
##########
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import datetime
+from typing import Any, Dict, Tuple
+
+import pytz
+
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.utils import timezone
+
+
+class DateTimeTrigger(BaseTrigger):
+    """
+    A trigger that fires exactly once, at the given datetime, give or take
+    a few seconds.
+
+    The provided datetime MUST be in UTC.
+    """
+
+    def __init__(self, moment: datetime.datetime):
+        super().__init__()
+        # Make sure it's in UTC
+        if moment.tzinfo is None:
+            self.moment = pytz.utc.localize(moment)
+        elif moment.tzinfo == pytz.utc or getattr(moment.tzinfo, "name", None) == "UTC":
+            self.moment = moment
+        else:
+            raise ValueError(f"The passed datetime must be in UTC, not {moment.tzinfo!r}")
+
+    def serialize(self) -> Tuple[str, Dict[str, Any]]:
+        return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
+
+    async def run(self):
+        """
+        Simple time delay loop until the relevant time is met.
+
+        We do have a two-phase delay to save some cycles, but sleeping is so
+        cheap anyway that it's pretty loose.
+        """
+        # Sleep an hour at a time while it's more than 2 hours away
+        while timezone.utcnow() - self.moment > datetime.timedelta(hours=2):
+            await (asyncio.sleep(3600))
+        # Sleep a second at a time otherwise
+        while self.moment > timezone.utcnow():
+            await asyncio.sleep(1)

Review comment:
       Not sure a comment was actually added (and I had the same question).

##########
File path: docs/apache-airflow/concepts/deferring.rst
##########
@@ -0,0 +1,163 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Deferrable Operators & Triggers
+===============================
+
+Standard :doc:`Operators <operators>` and :doc:`Sensors <sensors>` take up a full *worker slot* for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that's currently running but idle, then you *cannot run anything else* - even though your entire Airflow cluster is essentially idle. ``reschedule`` mode for Sensors solves some of this, allowing Sensors to only run at fixed intervals, but it is inflexible and only allows using time as the reason to resume, not anything else.
+
+This is where *Deferrable Operators* come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a *Trigger*. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors.
+
+*Triggers* are small, asynchronous pieces of Python code designed to be run all together in a single Python process; because they are asynchronous, they are able to all co-exist efficiently. As an overview of how this process works:
+
+* A task instance (running operator) gets to a point where it has to wait, and defers itself with a trigger tied to the event that should resume it. This frees up the worker to run something else.
+* The new Trigger instance is registered inside Airflow, and picked up by a *triggerer* process
+* The trigger is run until it fires, at which point its source task is re-scheduled
+* The scheduler queues the task to resume on a worker node
+
+Using deferrable operators as a DAG author is almost transparent; writing them, however, takes a bit more work.
+
+.. note::
+
+    Deferrable Operators & Triggers rely on more recent ``asyncio`` features, and as a result only work
+    on Python 3.7 or higher.
+
+
+Using Deferrable Operators
+--------------------------
+
+If all you wish to do is use pre-written Deferrable Operators (such as ``TimeSensorAsync``, which comes with Airflow), then there are only two steps you need:
+
+* Ensure your Airflow installation is running at least one ``triggerer`` process, as well as the normal ``scheduler``
+* Use deferrable operators/sensors in your DAGs
+
+That's it; everything else will be automatically handled for you. If you're upgrading existing DAGs, we even provide some API-compatible sensor variants (e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG with no other changes required.
+
+Note that you cannot yet use the deferral ability from inside custom PythonOperator/TaskFlow Python functions; it is only available to traditional, class-based Operators at the moment.
+
+
+Writing Deferrable Operators
+----------------------------
+
+Writing a deferrable operator takes a bit more work. There are some main points to consider:
+
+* Your Operator must defer itself based on a Trigger. If there is a Trigger in core Airflow you can use, great; otherwise, you will have to write one.
+* Your Operator will be stopped and removed from its worker while deferred, and no state will persist automatically. You can persist state by asking Airflow to resume you at a certain method or pass certain kwargs, but that's it.
+* You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control.
+* Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors.
+
+
+Triggering Deferral
+~~~~~~~~~~~~~~~~~~~
+
+If you want to trigger deferral, at any place in your Operator you can call ``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a special exception that Airflow will catch. The arguments are:
+
+* ``trigger``: An instance of a Trigger that you wish to defer on. It will be serialized into the database.
+* ``method_name``: The method name on your Operator you want Airflow to call when it resumes, other than ``execute``.
+* ``kwargs``: Additional keyword arguments to pass to the method when it is called. Optional, defaults to ``{}``.
+* ``timeout``: A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Optional, defaults to ``None``, meaning no timeout.
+
+When you opt to defer, your Operator will *stop executing at that point and be removed from its current worker*. No state - such as local variables, or attributes set on ``self`` - will persist, and when your Operator is resumed it will be a *brand new instance* of it. The only way you can pass state from the old instance of the Operator to the new one is via ``method_name`` and ``kwargs``.
+
+When your Operator is resumed, you will find an ``event`` item added to the kwargs passed to it, which contains the payload from the trigger event that resumed your Operator. Depending on the trigger, this may be useful to your operator (e.g. it's a status code or URL to fetch results), or it may not be important (it's just a datetime). Your ``method_name`` method, however, *must* accept ``event`` as a keyword argument.
+
+If your Operator returns from either its first ``execute()`` method when it's new, or a subsequent method specified by ``method_name``, it will be considered complete and will finish executing.
+
+You are free to set ``method_name`` to ``execute`` if you want your Operator to have one entrypoint, but it, too, will have to accept ``event`` as an optional keyword argument.
+
+Here's a basic example of how a sensor might trigger deferral::
+
+    class WaitOneHourSensor(BaseSensorOperator):
+        def execute(self, context):
+            self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1), method_name="execute_complete")
+
+        def execute_complete(self, context, event=None):
+            # We have no more work to do here. Mark as complete.
+            return
+
+This Sensor is literally just a thin wrapper around the Trigger, so all it does is defer to the trigger, and specify a different method to come back to when the trigger fires - which, as it returns immediately, marks the Sensor as successful.
+
+Under the hood, ``self.defer`` raises the ``TaskDeferred`` exception, so it will work anywhere inside your Operator's code, even buried many nested calls deep inside ``execute()``. You are free to raise ``TaskDeferred`` manually if you wish; it takes the same arguments as ``self.defer``.
+
+Note that ``execution_timeout`` on Operators is considered over the *total runtime*, not individual executions in-between deferrals - this means that if ``execution_timeout`` is set, an Operator may fail while it's deferred or while it's running after a deferral, even if it's only been resumed for a few seconds.
+
+
+Writing Triggers
+~~~~~~~~~~~~~~~~
+
+A Trigger is written as a class that inherits from ``BaseTrigger``, and implements three methods:
+
+* ``__init__``, to receive arguments from Operators instantiating it
+* ``run``, an asynchronous method that runs its logic and yields one or more ``TriggerEvent`` instances as an asynchronous generator
+* ``serialize``, which returns the information needed to re-construct this trigger, as a tuple of the classpath, and keyword arguments to pass to ``__init__``
+
+There's also some design constraints to be aware of:
+
+* The ``run`` method *must be asynchronous* (using Python's asyncio), and correctly ``await`` whenever it does a blocking operation.
+* ``run`` must ``yield`` its TriggerEvents, not return them. If it returns before yielding at least once event, Airflow will consider this an error and fail any Task Instances waiting on it. If it throws an exception, Airflow will also fail any dependent task instances.

Review comment:
       ```suggestion
   * ``run`` must ``yield`` its TriggerEvents, not return them. If it returns before yielding at least one event, Airflow will consider this an error and fail any Task Instances waiting on it. If it throws an exception, Airflow will also fail any dependent task instances.
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -699,6 +699,11 @@ def _run_scheduler_loop(self) -> None:
             self.adopt_or_reset_orphaned_tasks,
         )
 
+        timers.call_regular_interval(
+            conf.getfloat('scheduler', 'trigger_timeout_check_interval', fallback=15.0),

Review comment:
       ```suggestion
               conf.getfloat('scheduler', 'trigger_timeout_check_interval'),
   ```
   
   This default should be moved into `airflow.config_templates/config.yml` so it gets properly documented everywhere. You then won't need the fallback here because it'll use the value from `default_airflow.cfg`.

##########
File path: airflow/sensors/time_sensor.py
##########
@@ -35,3 +36,23 @@ def __init__(self, *, target_time, **kwargs):
     def poke(self, context):
         self.log.info('Checking if the time (%s) has come', self.target_time)
         return timezone.make_naive(timezone.utcnow(), self.dag.timezone).time() > self.target_time
+
+
+class TimeSensorAsync(BaseSensorOperator):
+    """
+    Waits until the specified time of the day, freeing up a worker slot while
+    it is waiting.
+
+    :param target_time: time after which the job succeeds
+    :type target_time: datetime.time
+    """
+
+    def __init__(self, *, target_time, **kwargs):
+        super().__init__(**kwargs)
+        self.target_time = target_time
+
+    def execute(self, context):
+        self.defer(trigger=DateTimeTrigger(moment=self.target_time), method_name="execute_complete")
+
+    def execute_complete(self, context, event=None):  # pylint: disable=unused-argument
+        """Callback for when the trigger fires - returns immediately."""

Review comment:
       nit: This is unlike the others in that it doesn't explicitly return None.

##########
File path: docs/apache-airflow/concepts/deferring.rst
##########
@@ -0,0 +1,163 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Deferrable Operators & Triggers
+===============================
+
+Standard :doc:`Operators <operators>` and :doc:`Sensors <sensors>` take up a full *worker slot* for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that's currently running but idle, then you *cannot run anything else* - even though your entire Airflow cluster is essentially idle. ``reschedule`` mode for Sensors solves some of this, allowing Sensors to only run at fixed intervals, but it is inflexible and only allows using time as the reason to resume, not anything else.
+
+This is where *Deferrable Operators* come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a *Trigger*. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors.
+
+*Triggers* are small, asynchronous pieces of Python code designed to be run all together in a single Python process; because they are asynchronous, they are able to all co-exist efficiently. As an overview of how this process works:
+
+* A task instance (running operator) gets to a point where it has to wait, and defers itself with a trigger tied to the event that should resume it. This frees up the worker to run something else.
+* The new Trigger instance is registered inside Airflow, and picked up by a *triggerer* process
+* The trigger is run until it fires, at which point its source task is re-scheduled
+* The scheduler queues the task to resume on a worker node
+
+Using deferrable operators as a DAG author is almost transparent; writing them, however, takes a bit more work.
+
+.. note::
+
+    Deferrable Operators & Triggers rely on more recent ``asyncio`` features, and as a result only work
+    on Python 3.7 or higher.
+
+
+Using Deferrable Operators
+--------------------------
+
+If all you wish to do is use pre-written Deferrable Operators (such as ``TimeSensorAsync``, which comes with Airflow), then there are only two steps you need:
+
+* Ensure your Airflow installation is running at least one ``triggerer`` process, as well as the normal ``scheduler``
+* Use deferrable operators/sensors in your DAGs
+
+That's it; everything else will be automatically handled for you. If you're upgrading existing DAGs, we even provide some API-compatible sensor variants (e.g. ``TimeSensorAsync`` for ``TimeSensor``) that you can swap into your DAG with no other changes required.
+
+Note that you cannot yet use the deferral ability from inside custom PythonOperator/TaskFlow Python functions; it is only available to traditional, class-based Operators at the moment.
+
+
+Writing Deferrable Operators
+----------------------------
+
+Writing a deferrable operator takes a bit more work. There are some main points to consider:
+
+* Your Operator must defer itself based on a Trigger. If there is a Trigger in core Airflow you can use, great; otherwise, you will have to write one.
+* Your Operator will be stopped and removed from its worker while deferred, and no state will persist automatically. You can persist state by asking Airflow to resume you at a certain method or pass certain kwargs, but that's it.
+* You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control.
+* Any Operator can defer; no special marking on its class is needed, and it's not limited to Sensors.
+
+
+Triggering Deferral
+~~~~~~~~~~~~~~~~~~~
+
+If you want to trigger deferral, at any place in your Operator you can call ``self.defer(trigger, method_name, kwargs, timeout)``, which will raise a special exception that Airflow will catch. The arguments are:
+
+* ``trigger``: An instance of a Trigger that you wish to defer on. It will be serialized into the database.
+* ``method_name``: The method name on your Operator you want Airflow to call when it resumes, other than ``execute``.
+* ``kwargs``: Additional keyword arguments to pass to the method when it is called. Optional, defaults to ``{}``.
+* ``timeout``: A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Optional, defaults to ``None``, meaning no timeout.
+
+When you opt to defer, your Operator will *stop executing at that point and be removed from its current worker*. No state - such as local variables, or attributes set on ``self`` - will persist, and when your Operator is resumed it will be a *brand new instance* of it. The only way you can pass state from the old instance of the Operator to the new one is via ``method_name`` and ``kwargs``.
+
+When your Operator is resumed, you will find an ``event`` item added to the kwargs passed to it, which contains the payload from the trigger event that resumed your Operator. Depending on the trigger, this may be useful to your operator (e.g. it's a status code or URL to fetch results), or it may not be important (it's just a datetime). Your ``method_name`` method, however, *must* accept ``event`` as a keyword argument.
+
+If your Operator returns from either its first ``execute()`` method when it's new, or a subsequent method specified by ``method_name``, it will be considered complete and will finish executing.
+
+You are free to set ``method_name`` to ``execute`` if you want your Operator to have one entrypoint, but it, too, will have to accept ``event`` as an optional keyword argument.
+
+Here's a basic example of how a sensor might trigger deferral::
+
+    class WaitOneHourSensor(BaseSensorOperator):
+        def execute(self, context):
+            self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1), method_name="execute_complete")
+
+        def execute_complete(self, context, event=None):
+            # We have no more work to do here. Mark as complete.
+            return
+
+This Sensor is literally just a thin wrapper around the Trigger, so all it does is defer to the trigger, and specify a different method to come back to when the trigger fires - which, as it returns immediately, marks the Sensor as successful.
+
+Under the hood, ``self.defer`` raises the ``TaskDeferred`` exception, so it will work anywhere inside your Operator's code, even buried many nested calls deep inside ``execute()``. You are free to raise ``TaskDeferred`` manually if you wish; it takes the same arguments as ``self.defer``.
+
+Note that ``execution_timeout`` on Operators is considered over the *total runtime*, not individual executions in-between deferrals - this means that if ``execution_timeout`` is set, an Operator may fail while it's deferred or while it's running after a deferral, even if it's only been resumed for a few seconds.
+
+
+Writing Triggers
+~~~~~~~~~~~~~~~~
+
+A Trigger is written as a class that inherits from ``BaseTrigger``, and implements three methods:
+
+* ``__init__``, to receive arguments from Operators instantiating it
+* ``run``, an asynchronous method that runs its logic and yields one or more ``TriggerEvent`` instances as an asynchronous generator
+* ``serialize``, which returns the information needed to re-construct this trigger, as a tuple of the classpath, and keyword arguments to pass to ``__init__``
+
+There's also some design constraints to be aware of:
+
+* The ``run`` method *must be asynchronous* (using Python's asyncio), and correctly ``await`` whenever it does a blocking operation.
+* ``run`` must ``yield`` its TriggerEvents, not return them. If it returns before yielding at least once event, Airflow will consider this an error and fail any Task Instances waiting on it. If it throws an exception, Airflow will also fail any dependent task instances.
+* A Trigger *must be able to run in parallel* with other copies of itself. This can happen both when two tasks defer based on the same trigger, and also if a network partition happens and Airflow re-launches a trigger on a separated machine.
+* When events are emitted, and if your trigger is designed to emit more than one event, they *must* contain a payload that can be used to deduplicate events if the trigger is being run in multiple places. If you only fire one event, and don't want to pass information in the payload back to the Operator that deferred, you can just set the payload to ``None``.
+* A trigger may be suddenly removed from one process and started on a new one (if partitions are being changed, or a deployment is happening). You may provide an optional ``cleanup`` method that gets called when this happens.
+
+Here's the structure of a basic Trigger::
+
+
+    class DateTimeTrigger(BaseTrigger):
+
+        def __init__(self, moment):
+            super().__init__()
+            self.moment = moment
+
+        def serialize(self):
+            return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment})
+
+        async def run(self):
+            while self.moment > timezone.utcnow():
+                await asyncio.sleep(1)
+            yield TriggerEvent(self.moment)
+
+This is a very simplified version of Airflow's ``DateTimeTrigger``, and you can see several things here:
+
+* ``__init__`` and ``serialize`` are written as a pair; the Trigger is instantiated once when it is submitted by the Operator as part of its deferral request, then serialized and re-instantiated on any *triggerer* process that runs the trigger.
+* The ``run`` method is declared as an ``async def``, as it *must* be asynchronous, and uses ``asyncio.sleep`` rather than the regular ``time.sleep`` (as that would block the process).
+* When it emits its event it packs ``self.moment`` in there, so if this trigger is being run redundantly on multiple hosts, the event can be de-duplicated.
+
+Triggers can be as complex or as simple as you like provided you keep inside this contract; they are designed to be run in a highly-available fashion, auto-distributed among hosts running the *triggerer*. We encourage you to avoid any kind of persistent state in a trigger; they should get everything they need from their ``__init__``, so they can be serialized and moved around freely.
+
+If you are new to writing asynchronous Python, you should be very careful writing your ``run()`` method; Python's async model means that any code that does not correctly ``await`` when it does a blocking operation will block the *entire process*. Airflow will attempt to detect this and warn you in the triggerer logs when it happens, but we strongly suggest you set the variable ``PYTHONASYNCIODEBUG=1`` when you are writing your Trigger to enable extra checks from Python to make sure you're writing non-blocking code. Be especially careful when doing filesystem calls, as if the underlying filesystem is network-backed it may be blocking.
+
+Right now, Triggers are only used up to their first event, as they are only used for resuming deferred tasks (which happens on the first event fired). However, we plan to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful.
+
+
+High Availability
+-----------------
+
+Triggers are designed from the ground-up to be highly-available; if you want to run a highly-available setup, simply run multiple copies of ``triggerer`` on multiple hosts. Much like ``scheduler``, they will automatically co-exist with correct locking and HA.
+
+Depending on how much work the triggers are doing, you can fit from hundreds to tens of thousands of triggers on a single ``triggerer`` host. By default, every ``triggerer`` will have a capacity of 1000 triggers it will try to run at once; you can change this with the ``--capacity`` argument. If you have more triggers trying to run than you have capacity across all of your ``triggerer`` processes, some triggers will be delayed from running until others have completed.
+
+Airflow tries to only run triggers in one place at once, and maintains a heartbeat to all ``triggerers`` that are currently running. If a ``triggerer`` dies, or becomes partitioned from the network where Airflow's database is running, Airflow will automatically re-schedule triggers that were on that host to run elsewhere (after waiting 30 seconds for the machine to re-appear).
+
+This means it's possible, but unlikely, for triggers to run in multiple places at once; this is designed into the Trigger contract, however, and entirely expected. Airflow will de-duplicate events fired when a trigger is running in multiple places simultaneously, so this process should be transparent to your Operators.
+
+Note that every extra ``triggerer`` you run will result in an extra persistent connection to your database.
+
+
+Smart Sensors
+-------------
+
+Deferrable Operators essentially supersede :doc:`Smart Sensors <smart-sensors>`, and should be preferred for almost all situations. They do solve fundamentally the same problem; Smart Sensors, however, only work for certain Sensor workload styles, have no redundancy, and require a custom DAG to run at all times.

Review comment:
       Should we add a note on the smart sensors docs page pointing over here too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org