You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/29 19:07:48 UTC

[GitHub] [airflow] BasPH commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

BasPH commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r416816158



##########
File path: airflow/models/baseoperator.py
##########
@@ -179,18 +180,49 @@ class derived from this one results in the creation of a task object,
     :param pool_slots: the number of pool slots this task should use (>= 1)
         Values less than 1 are not allowed.
     :type pool_slots: int
-    :param sla: time by which the job is expected to succeed. Note that
-        this represents the ``timedelta`` after the period is closed. For
-        example if you set an SLA of 1 hour, the scheduler would send an email
-        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
-        has not succeeded yet.
-        The scheduler pays special attention for jobs with an SLA and
-        sends alert
-        emails for sla misses. SLA misses are also recorded in the database
-        for future reference. All tasks that share the same SLA time
-        get bundled in a single email, sent soon after that time. SLA
-        notification are sent once and only once for each task instance.
+    :param sla: Deprecated in favor of ``expected_finish``.
     :type sla: datetime.timedelta
+    :param expected_duration: the maximum duration the task is allowed to take,
+        provided as a ``timedelta``. This ``timedelta`` is relative to when the
+        task actually starts running, not to the execution date. It includes

Review comment:
       Nobody expects duration to start counting from the execution_date, wouldn't mention it to above confusion.
   
   ```suggestion
           task actually starts running. It includes
   ```

##########
File path: airflow/models/baseoperator.py
##########
@@ -179,18 +180,49 @@ class derived from this one results in the creation of a task object,
     :param pool_slots: the number of pool slots this task should use (>= 1)
         Values less than 1 are not allowed.
     :type pool_slots: int
-    :param sla: time by which the job is expected to succeed. Note that
-        this represents the ``timedelta`` after the period is closed. For
-        example if you set an SLA of 1 hour, the scheduler would send an email
-        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
-        has not succeeded yet.
-        The scheduler pays special attention for jobs with an SLA and
-        sends alert
-        emails for sla misses. SLA misses are also recorded in the database
-        for future reference. All tasks that share the same SLA time
-        get bundled in a single email, sent soon after that time. SLA
-        notification are sent once and only once for each task instance.
+    :param sla: Deprecated in favor of ``expected_finish``.
     :type sla: datetime.timedelta
+    :param expected_duration: the maximum duration the task is allowed to take,
+        provided as a ``timedelta``. This ``timedelta`` is relative to when the
+        task actually starts running, not to the execution date. It includes
+        any task retries, similar to the ``execution_timeout`` parameter.
+        SLA misses are stored in the database, and then SLA miss callbacks
+        are triggered. The default SLA miss callback is to email task owners.
+        Note that SLA parameters do not influence the scheduler, so you should
+        set appropriate SLA parameters given your DAG's schedule. For example,
+        setting an ``expected_start`` of ``timedelta(hours=2)`` on a task that
+        depends on a ``TimeDeltaSensor`` with ``timedelta(hours=3)`` would be
+        expected to always cause an SLA miss.
+    :type expected_duration: datetime.timedelta
+    :param expected_start: time by which the task is expected to start,
+        provided as a ``timedelta`` relative to the expected start time of
+        this task's DAG. For instance, if you set an ``expected_start`` of
+        ``timedelta(hours=1)`` on a task inside of a DAG that runs on a

Review comment:
       ```suggestion
           ``timedelta(hours=1)`` on a task inside a DAG that runs on a
   ```

##########
File path: airflow/models/baseoperator.py
##########
@@ -179,18 +180,49 @@ class derived from this one results in the creation of a task object,
     :param pool_slots: the number of pool slots this task should use (>= 1)
         Values less than 1 are not allowed.
     :type pool_slots: int
-    :param sla: time by which the job is expected to succeed. Note that
-        this represents the ``timedelta`` after the period is closed. For
-        example if you set an SLA of 1 hour, the scheduler would send an email
-        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
-        has not succeeded yet.
-        The scheduler pays special attention for jobs with an SLA and
-        sends alert
-        emails for sla misses. SLA misses are also recorded in the database
-        for future reference. All tasks that share the same SLA time
-        get bundled in a single email, sent soon after that time. SLA
-        notification are sent once and only once for each task instance.
+    :param sla: Deprecated in favor of ``expected_finish``.
     :type sla: datetime.timedelta
+    :param expected_duration: the maximum duration the task is allowed to take,
+        provided as a ``timedelta``. This ``timedelta`` is relative to when the
+        task actually starts running, not to the execution date. It includes
+        any task retries, similar to the ``execution_timeout`` parameter.
+        SLA misses are stored in the database, and then SLA miss callbacks
+        are triggered. The default SLA miss callback is to email task owners.
+        Note that SLA parameters do not influence the scheduler, so you should
+        set appropriate SLA parameters given your DAG's schedule. For example,
+        setting an ``expected_start`` of ``timedelta(hours=2)`` on a task that
+        depends on a ``TimeDeltaSensor`` with ``timedelta(hours=3)`` would be
+        expected to always cause an SLA miss.

Review comment:
       This applies to `expected_start`, not `expected_duration`.

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types

Review comment:
       Can we leave out things like `__future__` and `six`? We don't support Python 2 anymore.

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))
+        if expected_start and not isinstance(expected_start, timedelta):
+            sla_param_errs.append("expected_start must be a timedelta, "
+                                  "got: {}".format(expected_start))

Review comment:
       ```suggestion
                                     "got: {}".format(type(expected_start)))
   ```

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       Can we store this script in, say, `airflow/sla.py`? It holds functions essential to the SLA functionality, not "utilities".

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))

Review comment:
       Shouldn't we print the type here?
   ```suggestion
                                     "got: {}".format(type(expected_duration)))
   ```

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(

Review comment:
       Let's use the `%s` formatting here

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]

Review comment:
       Agree, 100 seems like quite a lot too.

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):

Review comment:
       Where does `ts` come from?

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.

Review comment:
       +1!

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):

Review comment:
       Can we add a type hint to sla_misses?

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task
+
+            notification_sent = False
+            # If no callback exists, we don't want to send any notification;
+            # but we do want to update the SlaMiss in the database so that it
+            # doesn't keep looping.
+            if not task.sla_miss_callback:
+                notification_sent = True
+            else:
+                self.log.info("Calling sla_miss_callback for %s", sla_miss)
+                try:
+                    # Patch context with the current SLA miss.
+                    context = ti.get_template_context()
+                    context["sla_miss"] = sla_miss
+                    task.sla_miss_callback(context)
+                    notification_sent = True

Review comment:
       Why use an additional variable? Instead of setting it directly on `sla_miss`?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):

Review comment:
       same here: rename to e.g. "yield_uncreated_tis" and docstring for `ts`?

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))
+        if expected_start and not isinstance(expected_start, timedelta):
+            sla_param_errs.append("expected_start must be a timedelta, "
+                                  "got: {}".format(expected_start))
+        if expected_finish and not isinstance(expected_finish, timedelta):
+            sla_param_errs.append("expected_finish must be a timedelta, "
+                                  "got: {}".format(expected_finish))

Review comment:
       ```suggestion
                                     "got: {}".format(type(expected_finish)))
   ```

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.

Review comment:
       full?

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task
+
+            notification_sent = False
+            # If no callback exists, we don't want to send any notification;
+            # but we do want to update the SlaMiss in the database so that it
+            # doesn't keep looping.
+            if not task.sla_miss_callback:
+                notification_sent = True
+            else:
+                self.log.info("Calling sla_miss_callback for %s", sla_miss)
+                try:
+                    # Patch context with the current SLA miss.
+                    context = ti.get_template_context()
+                    context["sla_miss"] = sla_miss
+                    task.sla_miss_callback(context)
+                    notification_sent = True
+                    self.log.debug("Called sla_miss_callback for %s", sla_miss)
+                except Exception:
+                    self.log.exception(
+                        "Could not call sla_miss_callback for DAG %s",
+                        self.dag_id
+                    )

Review comment:
       Should we log the exception itself here?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):
+    """
+    Given an unscheduled `DagRun`, yield any unscheduled TIs that will exist
+    for it in the future, respecting the end date of the DAG and task. See note
+    above for why this is important for SLA notifications.
+    """
+    for task in dag_run.dag.tasks:
+        end_dates = []
+        if dag_run.dag.end_date:
+            end_dates.append(dag_run.dag.end_date)
+        if task.end_date:
+            end_dates.append(task.end_date)
+
+        # Create TIs if there is no end date, or it hasn't happened yet.
+        if not end_dates or ts < min(end_dates):
+            yield airflow.models.TaskInstance(task, dag_run.execution_date)
+
+
+def get_sla_misses(ti, session):
+    """
+    Get all SLA misses that match a particular TaskInstance. There may be
+    several matches if the Task has several independent SLAs.
+    """
+    SM = airflow.models.SlaMiss
+    return session.query(SM).filter(
+        SM.dag_id == ti.dag_id,
+        SM.task_id == ti.task_id,
+        SM.execution_date == ti.execution_date
+    ).all()
+
+
+def create_sla_misses(ti, timestamp, session):
+    """
+    Determine whether a TaskInstance has missed any SLAs as of a provided
+    timestamp. If it has, create `SlaMiss` objects in the provided session.
+    Note that one TaskInstance can have multiple SLA miss objects: for example,
+    it can both start late and run longer than expected.
+    """
+    # Skipped task instances will never trigger SLAs because they
+    # were intentionally not scheduled. Though, it's still a valid and
+    # interesting SLA miss if a task that's *going* to be skipped today is
+    # late! That could mean that an upstream task is hanging.
+    if ti.state == State.SKIPPED:
+        return
+
+    log.debug("Calculating SLA misses for %s as of %s", ti, timestamp)
+
+    SM = airflow.models.SlaMiss
+
+    # Get existing SLA misses for this task instance.
+    ti_misses = {sm.sla_type: sm for sm in get_sla_misses(ti, session)}

Review comment:
       We never use the values of this dict. How about we create a set of the sla_types and rename for more clarity:
   ```suggestion
       existing_sla_miss_types = {sm.sla_type for sm in get_sla_misses(ti, session)}
   ```

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task

Review comment:
       This logic is confusing to me?
   
   So `ti` should hold all task instances (which I assume is one single task instance at most because we filter on dag_id, task_id and execution_date). If `task_id` is known to the DAG, we pop from `ti` and replace it with the `task_id` from the DAG. But no guarantee we're replacing the same TI?
   
   Else, we create a new task instance? But why is this, is this case even possible?
   
   What is this code for?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       Can we rename to something more meaningful, e.g. "yield_uncreated_runs"?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):
+    """
+    Given an unscheduled `DagRun`, yield any unscheduled TIs that will exist
+    for it in the future, respecting the end date of the DAG and task. See note
+    above for why this is important for SLA notifications.
+    """
+    for task in dag_run.dag.tasks:
+        end_dates = []
+        if dag_run.dag.end_date:
+            end_dates.append(dag_run.dag.end_date)
+        if task.end_date:
+            end_dates.append(task.end_date)
+
+        # Create TIs if there is no end date, or it hasn't happened yet.
+        if not end_dates or ts < min(end_dates):
+            yield airflow.models.TaskInstance(task, dag_run.execution_date)
+
+
+def get_sla_misses(ti, session):
+    """
+    Get all SLA misses that match a particular TaskInstance. There may be
+    several matches if the Task has several independent SLAs.
+    """
+    SM = airflow.models.SlaMiss
+    return session.query(SM).filter(
+        SM.dag_id == ti.dag_id,
+        SM.task_id == ti.task_id,
+        SM.execution_date == ti.execution_date
+    ).all()
+
+
+def create_sla_misses(ti, timestamp, session):
+    """
+    Determine whether a TaskInstance has missed any SLAs as of a provided
+    timestamp. If it has, create `SlaMiss` objects in the provided session.
+    Note that one TaskInstance can have multiple SLA miss objects: for example,
+    it can both start late and run longer than expected.
+    """
+    # Skipped task instances will never trigger SLAs because they
+    # were intentionally not scheduled. Though, it's still a valid and
+    # interesting SLA miss if a task that's *going* to be skipped today is
+    # late! That could mean that an upstream task is hanging.
+    if ti.state == State.SKIPPED:
+        return
+
+    log.debug("Calculating SLA misses for %s as of %s", ti, timestamp)
+
+    SM = airflow.models.SlaMiss
+
+    # Get existing SLA misses for this task instance.
+    ti_misses = {sm.sla_type: sm for sm in get_sla_misses(ti, session)}
+
+    # Calculate SLA misses that don't already exist. Wrapping exceptions here
+    # is important so that an exception in one type of SLA doesn't
+    # prevent other task SLAs from getting triggered.
+
+    # SLA Miss for Expected Duration
+    # n.b. - this one can't be calculated until the ti has started!
+    if SM.TASK_DURATION_EXCEEDED not in ti_misses \
+            and ti.task.expected_duration and ti.start_date:
+        try:
+            if ti.state in State.finished():
+                duration = ti.end_date - ti.start_date
+            else:
+                # Use the current time, if the task is still running.
+                duration = timestamp - ti.start_date
+
+            if duration > ti.task.expected_duration:
+                log.debug("Task instance %s's duration of %s > its expected "
+                          "duration of %s. Creating duration exceeded SLA miss.",
+                          ti, duration, ti.task.expected_duration)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_DURATION_EXCEEDED,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance %s's duration of %s <= its expected "
+                          "duration of %s, SLA not yet missed.",
+                          ti, duration, ti.task.expected_duration)
+        except Exception:  # pylint: disable=broad-except
+            log.exception(
+                "Failed to calculate expected duration SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+    # SLA Miss for Expected Start
+    if SM.TASK_LATE_START not in ti_misses and ti.task.expected_start:
+        try:
+            # If a TI's exc date is 01-01-2018, we expect it to start by the next
+            # execution date (01-02-2018) plus a delta of expected_start.
+            expected_start = ti.task.dag.following_schedule(ti.execution_date)
+            expected_start += ti.task.expected_start
+
+            # The case where we have started the ti, but late
+            if ti.start_date and ti.start_date > expected_start:
+                log.debug("Task instance %s's actual start %s > its expected "
+                          "start of %s. Creating late start SLA miss.",
+                          ti, ti.start_date, expected_start)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_START,
+                    timestamp=timestamp))
+
+            # The case where we haven't even started the ti yet
+            elif timestamp > expected_start:
+                log.debug("Task instance %s has not started by its expected "
+                          "start of %s. Creating late start SLA miss.",
+                          ti, expected_start)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_START,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance %s's expected start of %s hasn't "
+                          "happened yet, SLA not yet missed.",
+                          ti, expected_start)
+        except Exception:  # pylint: disable=broad-except
+            log.exception(
+                "Failed to calculate expected start SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+    # SLA Miss for Expected Finish
+    if SM.TASK_LATE_FINISH not in ti_misses and ti.task.expected_finish:
+        try:
+            # If a TI's exc date is 01-01-2018, we expect it to finish by the next
+            # execution date (01-02-2018) plus a delta of expected_finish.
+            expected_finish = ti.task.dag.following_schedule(ti.execution_date)
+            expected_finish += ti.task.expected_finish
+
+            if ti.end_date and ti.end_date > expected_finish:
+                log.debug("Task instance %s's actual finish %s > its expected "
+                          "finish of %s. Creating late finish SLA miss.",
+                          ti, ti.end_date, expected_finish)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_FINISH,
+                    timestamp=timestamp))
+
+            elif timestamp > expected_finish:
+                log.debug("Task instance %s has not finished by its expected "
+                          "finish of %s. Creating late finish SLA miss.",
+                          ti, expected_finish)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_FINISH,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance %s's expected finish of %s hasn't "
+                          "happened yet, SLA not yet missed.",
+                          ti, expected_finish)
+        except Exception:  # pylint: disable=broad-except
+            log.exception(
+                "Failed to calculate expected finish SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+
+def send_sla_miss_email(context):
+    """
+    Send an SLA miss email. This is the default SLA miss callback.
+    """
+    sla_miss = context["sla_miss"]
+
+    if sla_miss.sla_type == sla_miss.TASK_DURATION_EXCEEDED:
+        email_function = send_task_duration_exceeded_email
+    elif sla_miss.sla_type == sla_miss.TASK_LATE_START:
+        email_function = send_task_late_start_email
+    elif sla_miss.sla_type == sla_miss.TASK_LATE_FINISH:
+        email_function = send_task_late_finish_email
+    else:
+        log.warning("Received unexpected SLA Miss type: %s", sla_miss.sla_type)
+        return
+
+    email_to, email_subject, email_body = email_function(context)
+    send_email(email_to, email_subject, email_body)
+
+
+def describe_task_instance(ti):
+    """
+    Return a string representation of the task instance.
+    """
+    return "{dag_id}.{task_id} [{exc_date}]".format(
+        dag_id=ti.dag_id,
+        task_id=ti.task_id,
+        exc_date=ti.execution_date
+    )

Review comment:
       TaskInstance has `repr()` for this. Let's remove.

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       Can we have a docstring to tell the meaning of `ts`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org