You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/24 17:45:07 UTC

[GitHub] [airflow] seanxwzhang opened a new pull request #8545: Xzhang/airflow sla refactoring

seanxwzhang opened a new pull request #8545:
URL: https://github.com/apache/airflow/pull/8545


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-623290340


   Addressed most, if not all comments in the previous round of review. Played a bit with two cases regarding how we fetch DagRuns for SLA consideration:
   
   1. Use a fixed number (e.g., 100) for fetching DRs
   2. Add an *sla_checked* column to DR and use it to filter out DRs that have already been checked.
   
   My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is **sla_checked**). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out *sla_checked* DRs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r418201150



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -750,7 +600,16 @@ def _process_dags(self, dags: List[DAG], session=None):
             if dag_runs_for_dag:
                 tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
                 if check_slas:
-                    self.manage_slas(dag)
+                    # Look for SLA misses for all task instances associated with this
+                    # DAG, whether or not corresponding runs have been created yet.
+                    # Catch exceptions, though; we don't want SLA checking failures to
+                    # prevent tasks from being scheduled!
+                    try:
+                        dag.manage_slas()
+                    except Exception:  # pylint: disable=broad-except

Review comment:
       good idea!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r417546005



##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]

Review comment:
       yeah, agreed. perhaps for every dag run created, we can create a to check entry in a separate dag sla table. every checked dag run can be removed from that table to keep the size growth under control.
   
   if we are not going to implementation filter optimization within this PR, we should at least move the grab last 100 entries logic from Python into db query.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8545: Xzhang/airflow sla refactoring

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-619155687


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://apache-airflow-slack.herokuapp.com/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-631691752


   Made requested changes, please take a look again :) @houqp @BasPH thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] smith-m edited a comment on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
smith-m edited a comment on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-696458546


   Is development on task level sla miss callback active? I may be able to help


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r428375456



##########
File path: airflow/models/dagrun.py
##########
@@ -491,6 +491,25 @@ def is_backfill(self):
             self.run_id.startswith(f"{DagRunType.BACKFILL_JOB.value}")
         )
 
+    @classmethod
+    @provide_session
+    def get_latest_run(cls, dag_id, session):
+        """Returns the latest DagRun for a given dag_id"""
+        subquery = (
+            session
+            .query(func.max(cls.execution_date).label('execution_date'))
+            .filter(cls.dag_id == dag_id)
+            .subquery()
+        )
+        dagrun = (

Review comment:
       they can, but since we don't have index on execution_date, order_by will translate to sort on postgres, and thus it may be more expensive (at least on postgres), 
   ```
   airflow=# explain select max(execution_date) from dag_run;
                             QUERY PLAN                           
   ---------------------------------------------------------------
    Aggregate  (cost=10.75..10.76 rows=1 width=8)
      ->  Seq Scan on dag_run  (cost=0.00..10.60 rows=60 width=8)
   (2 rows)
   
   airflow=# explain select execution_date from dag_run order by execution_date desc limit 1;
                                QUERY PLAN                              
   ---------------------------------------------------------------------
    Limit  (cost=10.90..10.90 rows=1 width=8)
      ->  Sort  (cost=10.90..11.05 rows=60 width=8)
            Sort Key: execution_date DESC
            ->  Seq Scan on dag_run  (cost=0.00..10.60 rows=60 width=8)
   (4 rows)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r428324933



##########
File path: airflow/models/dagrun.py
##########
@@ -491,6 +491,25 @@ def is_backfill(self):
             self.run_id.startswith(f"{DagRunType.BACKFILL_JOB.value}")
         )
 
+    @classmethod
+    @provide_session
+    def get_latest_run(cls, dag_id, session):
+        """Returns the latest DagRun for a given dag_id"""
+        subquery = (
+            session
+            .query(func.max(cls.execution_date).label('execution_date'))
+            .filter(cls.dag_id == dag_id)
+            .subquery()
+        )
+        dagrun = (

Review comment:
       can these two queries be combined into one using order by and limit?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] smith-m edited a comment on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
smith-m edited a comment on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-696458546


   Is development on task level sla miss callback active? I may be able to help


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r417535040



##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
       since those arguments already have mypy type annotated, do we still need to check it at runtime?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] smith-m commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
smith-m commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-696458546


   Is development on task level sla miss callback active?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dimberman commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
dimberman commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-635997051


   Hi @seanxwzhang we've fixed the k8s tests can you rebase? 
   
   @BasPH does this look good to you?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r415941751



##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]

Review comment:
       I'm thinking maybe we should keep a "latest checked execution date" and merge this to the query above, i.e., only get dag runs that haven't been processed , otherwise we are losing perf overtime with querying more and more dag runs. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp edited a comment on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp edited a comment on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-626404118


   If we can add `.filter(DR.state != State.SUCCESS)` to the query filter list, then I am also in favor of option 1 (without fixed 100 limit of course). It's simpler than 2 and shouldn't run into performance issue for majority of the use-cases :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r417532128



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -750,7 +600,16 @@ def _process_dags(self, dags: List[DAG], session=None):
             if dag_runs_for_dag:
                 tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
                 if check_slas:
-                    self.manage_slas(dag)
+                    # Look for SLA misses for all task instances associated with this
+                    # DAG, whether or not corresponding runs have been created yet.
+                    # Catch exceptions, though; we don't want SLA checking failures to
+                    # prevent tasks from being scheduled!
+                    try:
+                        dag.manage_slas()
+                    except Exception:  # pylint: disable=broad-except

Review comment:
       since we will be using sla for critical monitoring, i think we should do more than just log the exception. how about adding a metric for this so one can setup alerting for issues that impacts sla monitoring?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r418404338



##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       sure, that makes more sense

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       sure

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       sure

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
       you are right, we don't. The original PR is written in a time when Airflow is still compatible with PY2. :) 

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp edited a comment on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp edited a comment on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-626404118


   If we can add `.filter(DR.state != State.SUCCESS)` to the query filter list, then I am also in favor of option 1 (without 100 fixed limit of course). It's simpler than 2 and shouldn't run into performance issue for majority of the use-cases :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] smith-m commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
smith-m commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-696458546


   Is development on task level sla miss callback active?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r428389970



##########
File path: airflow/models/dagrun.py
##########
@@ -491,6 +491,25 @@ def is_backfill(self):
             self.run_id.startswith(f"{DagRunType.BACKFILL_JOB.value}")
         )
 
+    @classmethod
+    @provide_session
+    def get_latest_run(cls, dag_id, session):
+        """Returns the latest DagRun for a given dag_id"""
+        subquery = (
+            session
+            .query(func.max(cls.execution_date).label('execution_date'))
+            .filter(cls.dag_id == dag_id)
+            .subquery()
+        )
+        dagrun = (

Review comment:
       ha, ok, i didn't know execution_date column is not indexed, that's surprising. in that case, subquery is better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] smith-m edited a comment on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
smith-m edited a comment on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-696458546


   Is development on task level sla miss callback active? I may be able to help


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r417536580



##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))
+        if expected_start and not isinstance(expected_start, timedelta):
+            sla_param_errs.append("expected_start must be a timedelta, "
+                                  "got: {}".format(expected_start))
+        if expected_finish and not isinstance(expected_finish, timedelta):
+            sla_param_errs.append("expected_finish must be a timedelta, "
+                                  "got: {}".format(expected_finish))
+        if sla_param_errs:
+            raise AirflowException("Invalid SLA params were set! {}".format(
+                "; ".join(sla_param_errs)))
+
+        # If no exception has been raised, go ahead and set these.
+        self.expected_duration = expected_duration
+        self.expected_start = expected_start
+        self.expected_finish = expected_finish
+
+        # Warn the user if they've set any non-sensical parameter combinations
+        if self.expected_start and self.expected_finish \
+                and self.expected_start >= self.expected_finish:
+            self.log.warning(
+                "Task %s has an expected_start (%s) that occurs after its "
+                "expected_finish (%s), so it will always send an SLA "
+                "notification.",
+                self, self.expected_start, self.expected_finish
+            )
+
+            if self.expected_duration and self.expected_start \

Review comment:
       looks like this if block has one more indentation then it should?

##########
File path: airflow/models/taskinstance.py
##########
@@ -412,6 +412,17 @@ def mark_success_url(self):
             "&downstream=false"
         ).format(task_id=self.task_id, dag_id=self.dag_id, iso=iso)
 
+    @property
+    def details_url(self):
+        iso = quote(self.execution_date.isoformat())
+        base_url = conf.get('webserver', 'BASE_URL')
+        return base_url + (
+            "/task"
+            "?task_id={task_id}"
+            "&dag_id={dag_id}"
+            "&execution_date={iso}"
+        ).format(task_id=self.task_id, dag_id=self.dag_id, iso=iso)

Review comment:
       minor nitpick, we can avoid creating an extra string at runtime with the following code:
   
   ```python
           return (
               "{base_url}/task"
               "?task_id={task_id}"
               "&dag_id={dag_id}"
               "&execution_date={iso}"
           ).format(base_url=base_url, task_id=self.task_id, dag_id=self.dag_id, iso=iso)
   ```

##########
File path: airflow/models/baseoperator.py
##########
@@ -1187,6 +1306,12 @@ def get_serialized_fields(cls):
 
         return cls.__serialized_fields
 
+    def has_slas(self):

Review comment:
       nitpick, could you annotate return type for this method as well?

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))

Review comment:
       for logging, please don't use format, it will create the string at runtime regardless of logging level. always use `%s` with parameters passed in as arguments to logging call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-625434121


   > My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is sla_checked). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out sla_checked DRs.
   
   Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored?
   
   With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-621381774


   > Thank you for taking this on. I couldn't justify any more time on it but I think it's still very relevant to the project.
   
   Happy to contribute :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r417535040



##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
       since those arguments already have mypy type annotated, do we still need to check it at runtime? mypy should raise these errors automatically at build time right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r418254475



##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))
+        if expected_start and not isinstance(expected_start, timedelta):
+            sla_param_errs.append("expected_start must be a timedelta, "
+                                  "got: {}".format(expected_start))
+        if expected_finish and not isinstance(expected_finish, timedelta):
+            sla_param_errs.append("expected_finish must be a timedelta, "
+                                  "got: {}".format(expected_finish))
+        if sla_param_errs:
+            raise AirflowException("Invalid SLA params were set! {}".format(
+                "; ".join(sla_param_errs)))
+
+        # If no exception has been raised, go ahead and set these.
+        self.expected_duration = expected_duration
+        self.expected_start = expected_start
+        self.expected_finish = expected_finish
+
+        # Warn the user if they've set any non-sensical parameter combinations
+        if self.expected_start and self.expected_finish \
+                and self.expected_start >= self.expected_finish:
+            self.log.warning(
+                "Task %s has an expected_start (%s) that occurs after its "
+                "expected_finish (%s), so it will always send an SLA "
+                "notification.",
+                self, self.expected_start, self.expected_finish
+            )
+
+            if self.expected_duration and self.expected_start \

Review comment:
       good catch!

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):

Review comment:
       sure

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.

Review comment:
       not sure what that means either, I'll remove it.

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):

Review comment:
       it's created in line 1688, the idea is to take a snapshot of the current time and use that for comparison, as opposed to getting timestamp on the go. I'm indifferent of these 2 approaches, happy to change it if there's a valid reason for either one.

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task

Review comment:
       I believe this is for cases where SLAMiss is triggered on task instances that are yet to be scheduled. For example, since a TI has never been scheduled due to concurrency limits and the task has `expected_start` set, then there will be an SLAMiss created but there won't be a corresponding TI in database.

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task
+
+            notification_sent = False
+            # If no callback exists, we don't want to send any notification;
+            # but we do want to update the SlaMiss in the database so that it
+            # doesn't keep looping.
+            if not task.sla_miss_callback:
+                notification_sent = True
+            else:
+                self.log.info("Calling sla_miss_callback for %s", sla_miss)
+                try:
+                    # Patch context with the current SLA miss.
+                    context = ti.get_template_context()
+                    context["sla_miss"] = sla_miss
+                    task.sla_miss_callback(context)
+                    notification_sent = True

Review comment:
       I'm not sure either, setting it directly makes more sense to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r421708159



##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
       it should be enforced in the CI pipeline at build time. If you tested mypy and it's not complaining about it, then it's because mypy is still an evolving project, so it can't catch all the type errors yet, but it's safe to assume that it will eventually catch up.
   
   i wouldn't worry too much about these edge-cases to be honest. we are not doing runtime type check anywhere else where type hint is defined in the code base, so it's a little bit odd to leave this one as a special case. On top of that, this runtime type check will result in an exception. The end result is the same as without this check because they will all lead to unrecoverable crashes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Eronarn commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
Eronarn commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-621298326


   Thank you for taking this on. I couldn't justify any more time on it but I think it's still very relevant to the project.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r418414577



##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
       oops actually I think type hints are not forced in PY3, we'll need this bits.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] smith-m commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
smith-m commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-696458546


   Is development on task level sla miss callback active?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] BasPH commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
BasPH commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r416816158



##########
File path: airflow/models/baseoperator.py
##########
@@ -179,18 +180,49 @@ class derived from this one results in the creation of a task object,
     :param pool_slots: the number of pool slots this task should use (>= 1)
         Values less than 1 are not allowed.
     :type pool_slots: int
-    :param sla: time by which the job is expected to succeed. Note that
-        this represents the ``timedelta`` after the period is closed. For
-        example if you set an SLA of 1 hour, the scheduler would send an email
-        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
-        has not succeeded yet.
-        The scheduler pays special attention for jobs with an SLA and
-        sends alert
-        emails for sla misses. SLA misses are also recorded in the database
-        for future reference. All tasks that share the same SLA time
-        get bundled in a single email, sent soon after that time. SLA
-        notification are sent once and only once for each task instance.
+    :param sla: Deprecated in favor of ``expected_finish``.
     :type sla: datetime.timedelta
+    :param expected_duration: the maximum duration the task is allowed to take,
+        provided as a ``timedelta``. This ``timedelta`` is relative to when the
+        task actually starts running, not to the execution date. It includes

Review comment:
       Nobody expects duration to start counting from the execution_date, wouldn't mention it to above confusion.
   
   ```suggestion
           task actually starts running. It includes
   ```

##########
File path: airflow/models/baseoperator.py
##########
@@ -179,18 +180,49 @@ class derived from this one results in the creation of a task object,
     :param pool_slots: the number of pool slots this task should use (>= 1)
         Values less than 1 are not allowed.
     :type pool_slots: int
-    :param sla: time by which the job is expected to succeed. Note that
-        this represents the ``timedelta`` after the period is closed. For
-        example if you set an SLA of 1 hour, the scheduler would send an email
-        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
-        has not succeeded yet.
-        The scheduler pays special attention for jobs with an SLA and
-        sends alert
-        emails for sla misses. SLA misses are also recorded in the database
-        for future reference. All tasks that share the same SLA time
-        get bundled in a single email, sent soon after that time. SLA
-        notification are sent once and only once for each task instance.
+    :param sla: Deprecated in favor of ``expected_finish``.
     :type sla: datetime.timedelta
+    :param expected_duration: the maximum duration the task is allowed to take,
+        provided as a ``timedelta``. This ``timedelta`` is relative to when the
+        task actually starts running, not to the execution date. It includes
+        any task retries, similar to the ``execution_timeout`` parameter.
+        SLA misses are stored in the database, and then SLA miss callbacks
+        are triggered. The default SLA miss callback is to email task owners.
+        Note that SLA parameters do not influence the scheduler, so you should
+        set appropriate SLA parameters given your DAG's schedule. For example,
+        setting an ``expected_start`` of ``timedelta(hours=2)`` on a task that
+        depends on a ``TimeDeltaSensor`` with ``timedelta(hours=3)`` would be
+        expected to always cause an SLA miss.
+    :type expected_duration: datetime.timedelta
+    :param expected_start: time by which the task is expected to start,
+        provided as a ``timedelta`` relative to the expected start time of
+        this task's DAG. For instance, if you set an ``expected_start`` of
+        ``timedelta(hours=1)`` on a task inside of a DAG that runs on a

Review comment:
       ```suggestion
           ``timedelta(hours=1)`` on a task inside a DAG that runs on a
   ```

##########
File path: airflow/models/baseoperator.py
##########
@@ -179,18 +180,49 @@ class derived from this one results in the creation of a task object,
     :param pool_slots: the number of pool slots this task should use (>= 1)
         Values less than 1 are not allowed.
     :type pool_slots: int
-    :param sla: time by which the job is expected to succeed. Note that
-        this represents the ``timedelta`` after the period is closed. For
-        example if you set an SLA of 1 hour, the scheduler would send an email
-        soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance
-        has not succeeded yet.
-        The scheduler pays special attention for jobs with an SLA and
-        sends alert
-        emails for sla misses. SLA misses are also recorded in the database
-        for future reference. All tasks that share the same SLA time
-        get bundled in a single email, sent soon after that time. SLA
-        notification are sent once and only once for each task instance.
+    :param sla: Deprecated in favor of ``expected_finish``.
     :type sla: datetime.timedelta
+    :param expected_duration: the maximum duration the task is allowed to take,
+        provided as a ``timedelta``. This ``timedelta`` is relative to when the
+        task actually starts running, not to the execution date. It includes
+        any task retries, similar to the ``execution_timeout`` parameter.
+        SLA misses are stored in the database, and then SLA miss callbacks
+        are triggered. The default SLA miss callback is to email task owners.
+        Note that SLA parameters do not influence the scheduler, so you should
+        set appropriate SLA parameters given your DAG's schedule. For example,
+        setting an ``expected_start`` of ``timedelta(hours=2)`` on a task that
+        depends on a ``TimeDeltaSensor`` with ``timedelta(hours=3)`` would be
+        expected to always cause an SLA miss.

Review comment:
       This applies to `expected_start`, not `expected_duration`.

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types

Review comment:
       Can we leave out things like `__future__` and `six`? We don't support Python 2 anymore.

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))
+        if expected_start and not isinstance(expected_start, timedelta):
+            sla_param_errs.append("expected_start must be a timedelta, "
+                                  "got: {}".format(expected_start))

Review comment:
       ```suggestion
                                     "got: {}".format(type(expected_start)))
   ```

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       Can we store this script in, say, `airflow/sla.py`? It holds functions essential to the SLA functionality, not "utilities".

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))

Review comment:
       Shouldn't we print the type here?
   ```suggestion
                                     "got: {}".format(type(expected_duration)))
   ```

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(

Review comment:
       Let's use the `%s` formatting here

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]

Review comment:
       Agree, 100 seems like quite a lot too.

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):

Review comment:
       Where does `ts` come from?

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.

Review comment:
       +1!

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):

Review comment:
       Can we add a type hint to sla_misses?

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task
+
+            notification_sent = False
+            # If no callback exists, we don't want to send any notification;
+            # but we do want to update the SlaMiss in the database so that it
+            # doesn't keep looping.
+            if not task.sla_miss_callback:
+                notification_sent = True
+            else:
+                self.log.info("Calling sla_miss_callback for %s", sla_miss)
+                try:
+                    # Patch context with the current SLA miss.
+                    context = ti.get_template_context()
+                    context["sla_miss"] = sla_miss
+                    task.sla_miss_callback(context)
+                    notification_sent = True

Review comment:
       Why use an additional variable? Instead of setting it directly on `sla_miss`?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):

Review comment:
       same here: rename to e.g. "yield_uncreated_tis" and docstring for `ts`?

##########
File path: airflow/models/baseoperator.py
##########
@@ -392,10 +442,79 @@ def __init__(
                                    % (self.task_id, dag.dag_id))
         self.sla = sla
         self.execution_timeout = execution_timeout
+
+        # Warn about use of the deprecated SLA parameter
+        if sla and expected_finish:
+            warnings.warn(
+                "Both sla and expected_finish provided as task "
+                "parameters to {}; using expected_finish and ignoring "
+                "deprecated sla parameter.".format(self),
+                category=PendingDeprecationWarning
+            )
+        elif sla:
+            warnings.warn(
+                "sla is deprecated as a task parameter for {}; converting to "
+                "expected_finish instead.".format(self),
+                category=PendingDeprecationWarning
+            )
+            expected_finish = sla
+
+        # Set SLA parameters, batching invalid type messages into a
+        # single exception.
+        sla_param_errs: List = []
+        if expected_duration and not isinstance(expected_duration, timedelta):
+            sla_param_errs.append("expected_duration must be a timedelta, "
+                                  "got: {}".format(expected_duration))
+        if expected_start and not isinstance(expected_start, timedelta):
+            sla_param_errs.append("expected_start must be a timedelta, "
+                                  "got: {}".format(expected_start))
+        if expected_finish and not isinstance(expected_finish, timedelta):
+            sla_param_errs.append("expected_finish must be a timedelta, "
+                                  "got: {}".format(expected_finish))

Review comment:
       ```suggestion
                                     "got: {}".format(type(expected_finish)))
   ```

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.

Review comment:
       full?

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task
+
+            notification_sent = False
+            # If no callback exists, we don't want to send any notification;
+            # but we do want to update the SlaMiss in the database so that it
+            # doesn't keep looping.
+            if not task.sla_miss_callback:
+                notification_sent = True
+            else:
+                self.log.info("Calling sla_miss_callback for %s", sla_miss)
+                try:
+                    # Patch context with the current SLA miss.
+                    context = ti.get_template_context()
+                    context["sla_miss"] = sla_miss
+                    task.sla_miss_callback(context)
+                    notification_sent = True
+                    self.log.debug("Called sla_miss_callback for %s", sla_miss)
+                except Exception:
+                    self.log.exception(
+                        "Could not call sla_miss_callback for DAG %s",
+                        self.dag_id
+                    )

Review comment:
       Should we log the exception itself here?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):
+    """
+    Given an unscheduled `DagRun`, yield any unscheduled TIs that will exist
+    for it in the future, respecting the end date of the DAG and task. See note
+    above for why this is important for SLA notifications.
+    """
+    for task in dag_run.dag.tasks:
+        end_dates = []
+        if dag_run.dag.end_date:
+            end_dates.append(dag_run.dag.end_date)
+        if task.end_date:
+            end_dates.append(task.end_date)
+
+        # Create TIs if there is no end date, or it hasn't happened yet.
+        if not end_dates or ts < min(end_dates):
+            yield airflow.models.TaskInstance(task, dag_run.execution_date)
+
+
+def get_sla_misses(ti, session):
+    """
+    Get all SLA misses that match a particular TaskInstance. There may be
+    several matches if the Task has several independent SLAs.
+    """
+    SM = airflow.models.SlaMiss
+    return session.query(SM).filter(
+        SM.dag_id == ti.dag_id,
+        SM.task_id == ti.task_id,
+        SM.execution_date == ti.execution_date
+    ).all()
+
+
+def create_sla_misses(ti, timestamp, session):
+    """
+    Determine whether a TaskInstance has missed any SLAs as of a provided
+    timestamp. If it has, create `SlaMiss` objects in the provided session.
+    Note that one TaskInstance can have multiple SLA miss objects: for example,
+    it can both start late and run longer than expected.
+    """
+    # Skipped task instances will never trigger SLAs because they
+    # were intentionally not scheduled. Though, it's still a valid and
+    # interesting SLA miss if a task that's *going* to be skipped today is
+    # late! That could mean that an upstream task is hanging.
+    if ti.state == State.SKIPPED:
+        return
+
+    log.debug("Calculating SLA misses for %s as of %s", ti, timestamp)
+
+    SM = airflow.models.SlaMiss
+
+    # Get existing SLA misses for this task instance.
+    ti_misses = {sm.sla_type: sm for sm in get_sla_misses(ti, session)}

Review comment:
       We never use the values of this dict. How about we create a set of the sla_types and rename for more clarity:
   ```suggestion
       existing_sla_miss_types = {sm.sla_type for sm in get_sla_misses(ti, session)}
   ```

##########
File path: airflow/models/dag.py
##########
@@ -1582,6 +1580,196 @@ def sync_to_db(self, sync_time=None, session=None):
         """
         self.bulk_sync_to_db([self], sync_time, session)
 
+    @provide_session
+    def manage_slas(self, session=None):
+        """
+        Helper function to encapsulate the sequence of SLA operations.
+        """
+        # Create SlaMiss objects for the various types of SLA misses.
+        self.record_sla_misses(session=session)
+
+        # Collect pending SLA miss callbacks, either created immediately above
+        # or previously failed.
+        unsent_sla_misses = self.get_unsent_sla_notifications(session=session)
+        self.log.debug("Found %s unsent SLA miss notifications",
+                       len(unsent_sla_misses))
+
+        # Trigger the SLA miss callbacks.
+        if unsent_sla_misses:
+            self.send_sla_notifications(unsent_sla_misses, session=session)
+
+    @provide_session
+    def record_sla_misses(self, session=None):
+        """
+        Create SLAMiss records for task instances associated with tasks in this
+        DAG. This involves walking forward to address potentially unscheduled
+        but expected executions, since new DAG runs may not get created if
+        there are concurrency restrictions on the scheduler. We still want to
+        receive SLA notifications in that scenario!
+        In the future, it would be preferable to have an SLA monitoring service
+        that runs independently from the scheduler, so that the service
+        responsible for scheduling work is not also responsible for determining
+        whether work is being scheduled.
+        """
+        self.log.debug("Checking for SLA misses for DAG %s", self.dag_id)
+
+        # Get all current DagRuns.
+        scheduled_dagruns = DagRun.find(
+            dag_id=self.dag_id,
+            # TODO related to AIRFLOW-2236: determine how SLA misses should
+            # work for backfills and externally triggered
+            # DAG runs. At minimum they could have duration SLA misses.
+            external_trigger=False,
+            no_backfills=True,
+            # We aren't passing in the "state" parameter because we care about
+            # checking for SLAs whether the DAG run has failed, succeeded, or
+            # is still running.
+            session=session
+        )
+
+        # TODO: Is there a better limit here than "look at most recent 100"?
+        # Perhaps there should be a configurable lookback window on the DAG,
+        # for how many runs to consider SLA violations for.
+        scheduled_dagruns = scheduled_dagruns[-100:]
+        scheduled_dagrun_ids = [d.id for d in scheduled_dagruns]
+
+        TI = TaskInstance
+        DR = DagRun
+
+        if scheduled_dagrun_ids:
+            # Find full, existing TIs for these DagRuns.
+            scheduled_tis = (
+                session.query(TI)
+                .outerjoin(DR, and_(
+                    DR.dag_id == TI.dag_id,
+                    DR.execution_date == TI.execution_date))
+                # Only look at TIs for this DAG.
+                .filter(TI.dag_id == self.dag_id)
+                # Only look at TIs that *still* exist in this DAG.
+                .filter(TI.task_id.in_(self.task_ids))
+                # Don't look for success/skip TIs. We check SLAs often, so
+                # there's little chance that a TI switches to successful
+                # after an SLA miss but before we notice; and this should
+                # be a major perf boost (since most TIs are successful or
+                # skipped).
+                .filter(or_(
+                    # has to be written this way to account for sql nulls
+                    TI.state == None, # noqa E711
+                    not_(TI.state.in_((State.SUCCESS, State.SKIPPED)))
+                ))
+                # Only look at specified DagRuns
+                .filter(DR.id.in_(scheduled_dagrun_ids))
+                # If the DAGRun is SUCCEEDED, then everything has gone
+                # according to plan. But if it's FAILED, someone may be
+                # coming to fix it, and SLAs for tasks in it will still
+                # matter.
+                .filter(DR.state != State.SUCCESS)
+                .order_by(asc(DR.execution_date))
+                .all()
+            )
+        else:
+            scheduled_tis = []
+
+        self.log.debug(
+            "Found {} outstanding TIs across {} dagruns for DAG {}".format(
+                len(scheduled_tis), len(scheduled_dagruns), self.dag_id))
+
+        # We need to examine unscheduled DAGRuns, too. If there are concurrency
+        # limitations, it's possible that a task instance will miss its SLA
+        # before its corresponding DAGRun even gets created.
+        last_dagrun = scheduled_dagruns[-1] if scheduled_dagruns else None
+
+        def unscheduled_tis(last_dagrun):
+            for dag_run in yield_unscheduled_runs(self, last_dagrun, ts):
+                for ti in yield_unscheduled_tis(dag_run, ts):
+                    yield ti
+
+        # Snapshot the time to check SLAs against.
+        ts = timezone.utcnow()
+
+        for ti in itertools.chain(scheduled_tis, unscheduled_tis(last_dagrun)):
+            ti.task = self.task_dict[ti.task_id]
+            # Ignore tasks that don't have SLAs, saving most calculation of
+            # future task instances.
+            if ti.task.has_slas():
+                create_sla_misses(ti, ts, session=session)
+
+        # Save any SlaMisses that were created in `create_sla_misses()`
+        session.commit()
+
+    @provide_session
+    def get_unsent_sla_notifications(self, session=None):
+        """
+        Find all SlaMisses for this DAG that haven't yet been notified.
+        """
+        return (
+            session
+            .query(SlaMiss)
+            .filter(SlaMiss.notification_sent == False)  # noqa
+            .filter(SlaMiss.dag_id == self.dag_id)
+            .all()
+        )
+
+    @provide_session
+    def send_sla_notifications(self, sla_misses, session=None):
+        """
+        Given a list of SLA misses, send emails and/or do SLA miss callback.
+        """
+        if not sla_misses:
+            self.log.warning("send_sla_notifications was called without any "
+                             "SLA notifications to send!")
+            return
+
+        # Retrieve the context for this TI, but patch in the SLA miss object.
+        for sla_miss in sla_misses:
+            if sla_miss.notification_sent:
+                self.log.debug("SLA miss %s has already had a notification sent, "
+                               "ignoring.", sla_miss)
+
+            TI = TaskInstance
+            ti = session.query(TI).filter(
+                TI.dag_id == sla_miss.dag_id,
+                TI.task_id == sla_miss.task_id,
+                TI.execution_date == sla_miss.execution_date,
+            ).all()
+
+            # Use the TI if found
+            task = self.get_task(sla_miss.task_id)
+            if ti:
+                ti = ti.pop()
+                ti.task = task
+            # Else make a temporary one.
+            else:
+                ti = TaskInstance(task, sla_miss.execution_date)
+                ti.task = task

Review comment:
       This logic is confusing to me?
   
   So `ti` should hold all task instances (which I assume is one single task instance at most because we filter on dag_id, task_id and execution_date). If `task_id` is known to the DAG, we pop from `ti` and replace it with the `task_id` from the DAG. But no guarantee we're replacing the same TI?
   
   Else, we create a new task instance? But why is this, is this case even possible?
   
   What is this code for?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       Can we rename to something more meaningful, e.g. "yield_uncreated_runs"?

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):
+    """
+    Yield new DagRuns that haven't been created yet. This functionality is
+    important to SLA misses because it is possible for the scheduler to fall
+    so far behind that it cannot create a DAGRun when it is supposed to (like
+    if it is offline, or if there are strict concurrency limits). We need to
+    understand and alert on what DAGRuns *should* have been created by this
+    point in time.
+    """
+
+    # TODO: A lot of this logic is duplicated from the scheduler. It would
+    # be better to have one function that yields upcoming DAG runs in a
+    # consistent way that is usable for both use cases.
+
+    # Start by assuming that there is no next run.
+    next_run_date = None
+
+    # The first DAGRun has not been created yet.
+    if not last_scheduled_run:
+        task_start_dates = [t.start_date for t in dag.tasks]
+        if task_start_dates:
+            next_run_date = dag.normalize_schedule(min(task_start_dates))
+    # The DagRun is @once and has already happened.
+    elif dag.schedule_interval == '@once':
+        return
+    # Start from the next "normal" run.
+    else:
+        next_run_date = dag.following_schedule(last_scheduled_run.execution_date)
+
+    while True:
+        # There should be a next execution.
+        if not next_run_date:
+            return
+
+        # The next execution shouldn't be in the future.
+        if next_run_date > ts:
+            return
+
+        # The next execution shouldn't be beyond the DAG's end date.
+        # n.b. - tasks have their own end dates checked later
+        if next_run_date and dag.end_date and next_run_date > dag.end_date:
+            return
+
+        # Calculate the end of this execution period.
+        if dag.schedule_interval == '@once':
+            period_end = next_run_date
+        else:
+            period_end = dag.following_schedule(next_run_date)
+
+        # The next execution shouldn't still be mid-period.
+        if period_end > ts:
+            return
+
+        # We've passed every filter; this is a valid future DagRun that
+        # presumably hasn't been scheduled due to concurrency limits.
+        # Create and yield a fake DAGRun, which won't exist in the db yet.
+        next_run = airflow.models.DagRun(
+            dag_id=dag.dag_id,
+            run_id='manual__' + next_run_date.isoformat(),
+            execution_date=next_run_date,
+            start_date=ts,
+            state=State.NONE,
+            external_trigger=False,
+        )
+        next_run.dag = dag
+        yield next_run
+
+        # Examine the next date.
+        next_run_date = dag.following_schedule(next_run_date)
+
+
+def yield_unscheduled_tis(dag_run, ts):
+    """
+    Given an unscheduled `DagRun`, yield any unscheduled TIs that will exist
+    for it in the future, respecting the end date of the DAG and task. See note
+    above for why this is important for SLA notifications.
+    """
+    for task in dag_run.dag.tasks:
+        end_dates = []
+        if dag_run.dag.end_date:
+            end_dates.append(dag_run.dag.end_date)
+        if task.end_date:
+            end_dates.append(task.end_date)
+
+        # Create TIs if there is no end date, or it hasn't happened yet.
+        if not end_dates or ts < min(end_dates):
+            yield airflow.models.TaskInstance(task, dag_run.execution_date)
+
+
+def get_sla_misses(ti, session):
+    """
+    Get all SLA misses that match a particular TaskInstance. There may be
+    several matches if the Task has several independent SLAs.
+    """
+    SM = airflow.models.SlaMiss
+    return session.query(SM).filter(
+        SM.dag_id == ti.dag_id,
+        SM.task_id == ti.task_id,
+        SM.execution_date == ti.execution_date
+    ).all()
+
+
+def create_sla_misses(ti, timestamp, session):
+    """
+    Determine whether a TaskInstance has missed any SLAs as of a provided
+    timestamp. If it has, create `SlaMiss` objects in the provided session.
+    Note that one TaskInstance can have multiple SLA miss objects: for example,
+    it can both start late and run longer than expected.
+    """
+    # Skipped task instances will never trigger SLAs because they
+    # were intentionally not scheduled. Though, it's still a valid and
+    # interesting SLA miss if a task that's *going* to be skipped today is
+    # late! That could mean that an upstream task is hanging.
+    if ti.state == State.SKIPPED:
+        return
+
+    log.debug("Calculating SLA misses for %s as of %s", ti, timestamp)
+
+    SM = airflow.models.SlaMiss
+
+    # Get existing SLA misses for this task instance.
+    ti_misses = {sm.sla_type: sm for sm in get_sla_misses(ti, session)}
+
+    # Calculate SLA misses that don't already exist. Wrapping exceptions here
+    # is important so that an exception in one type of SLA doesn't
+    # prevent other task SLAs from getting triggered.
+
+    # SLA Miss for Expected Duration
+    # n.b. - this one can't be calculated until the ti has started!
+    if SM.TASK_DURATION_EXCEEDED not in ti_misses \
+            and ti.task.expected_duration and ti.start_date:
+        try:
+            if ti.state in State.finished():
+                duration = ti.end_date - ti.start_date
+            else:
+                # Use the current time, if the task is still running.
+                duration = timestamp - ti.start_date
+
+            if duration > ti.task.expected_duration:
+                log.debug("Task instance %s's duration of %s > its expected "
+                          "duration of %s. Creating duration exceeded SLA miss.",
+                          ti, duration, ti.task.expected_duration)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_DURATION_EXCEEDED,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance %s's duration of %s <= its expected "
+                          "duration of %s, SLA not yet missed.",
+                          ti, duration, ti.task.expected_duration)
+        except Exception:  # pylint: disable=broad-except
+            log.exception(
+                "Failed to calculate expected duration SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+    # SLA Miss for Expected Start
+    if SM.TASK_LATE_START not in ti_misses and ti.task.expected_start:
+        try:
+            # If a TI's exc date is 01-01-2018, we expect it to start by the next
+            # execution date (01-02-2018) plus a delta of expected_start.
+            expected_start = ti.task.dag.following_schedule(ti.execution_date)
+            expected_start += ti.task.expected_start
+
+            # The case where we have started the ti, but late
+            if ti.start_date and ti.start_date > expected_start:
+                log.debug("Task instance %s's actual start %s > its expected "
+                          "start of %s. Creating late start SLA miss.",
+                          ti, ti.start_date, expected_start)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_START,
+                    timestamp=timestamp))
+
+            # The case where we haven't even started the ti yet
+            elif timestamp > expected_start:
+                log.debug("Task instance %s has not started by its expected "
+                          "start of %s. Creating late start SLA miss.",
+                          ti, expected_start)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_START,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance %s's expected start of %s hasn't "
+                          "happened yet, SLA not yet missed.",
+                          ti, expected_start)
+        except Exception:  # pylint: disable=broad-except
+            log.exception(
+                "Failed to calculate expected start SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+    # SLA Miss for Expected Finish
+    if SM.TASK_LATE_FINISH not in ti_misses and ti.task.expected_finish:
+        try:
+            # If a TI's exc date is 01-01-2018, we expect it to finish by the next
+            # execution date (01-02-2018) plus a delta of expected_finish.
+            expected_finish = ti.task.dag.following_schedule(ti.execution_date)
+            expected_finish += ti.task.expected_finish
+
+            if ti.end_date and ti.end_date > expected_finish:
+                log.debug("Task instance %s's actual finish %s > its expected "
+                          "finish of %s. Creating late finish SLA miss.",
+                          ti, ti.end_date, expected_finish)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_FINISH,
+                    timestamp=timestamp))
+
+            elif timestamp > expected_finish:
+                log.debug("Task instance %s has not finished by its expected "
+                          "finish of %s. Creating late finish SLA miss.",
+                          ti, expected_finish)
+                session.merge(SM(
+                    task_id=ti.task_id,
+                    dag_id=ti.dag_id,
+                    execution_date=ti.execution_date,
+                    sla_type=SM.TASK_LATE_FINISH,
+                    timestamp=timestamp))
+            else:
+                log.debug("Task instance %s's expected finish of %s hasn't "
+                          "happened yet, SLA not yet missed.",
+                          ti, expected_finish)
+        except Exception:  # pylint: disable=broad-except
+            log.exception(
+                "Failed to calculate expected finish SLA miss for "
+                "task instance %s",
+                ti
+            )
+
+
+def send_sla_miss_email(context):
+    """
+    Send an SLA miss email. This is the default SLA miss callback.
+    """
+    sla_miss = context["sla_miss"]
+
+    if sla_miss.sla_type == sla_miss.TASK_DURATION_EXCEEDED:
+        email_function = send_task_duration_exceeded_email
+    elif sla_miss.sla_type == sla_miss.TASK_LATE_START:
+        email_function = send_task_late_start_email
+    elif sla_miss.sla_type == sla_miss.TASK_LATE_FINISH:
+        email_function = send_task_late_finish_email
+    else:
+        log.warning("Received unexpected SLA Miss type: %s", sla_miss.sla_type)
+        return
+
+    email_to, email_subject, email_body = email_function(context)
+    send_email(email_to, email_subject, email_body)
+
+
+def describe_task_instance(ti):
+    """
+    Return a string representation of the task instance.
+    """
+    return "{dag_id}.{task_id} [{exc_date}]".format(
+        dag_id=ti.dag_id,
+        task_id=ti.task_id,
+        exc_date=ti.execution_date
+    )

Review comment:
       TaskInstance has `repr()` for this. Let's remove.

##########
File path: airflow/utils/sla.py
##########
@@ -0,0 +1,500 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+
+import logging
+
+from six import string_types
+from sqlalchemy import or_
+
+import airflow.models  # pylint: disable=cyclic-import
+from airflow.utils import asciiart
+from airflow.utils.email import send_email
+from airflow.utils.session import provide_session
+from airflow.utils.state import State
+
+log = logging.getLogger(__name__)
+
+
+def yield_unscheduled_runs(dag, last_scheduled_run, ts):

Review comment:
       Can we have a docstring to tell the meaning of `ts`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-634199947


   CI seems to be having trouble
   ```
   unable to recognize "/opt/airflow/scripts/ci/in_container/kubernetes/app/postgres.yaml": Get https://airflow-python-3.6-v1.15.3-control-plane:6443/api?timeout=32s: dial tcp: lookup airflow-python-3.6-v1.15.3-control-plane on 127.0.0.11:53: no such host
   ```
   I'll re-push but flag it here in case it's an indication that something is off. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] seanxwzhang commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
seanxwzhang commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-626397934


   > Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored?
   
   True. I guess the way to do it (if no addition column is added) would be to remove the the fixed count, and simply do 
   
   ```
   scheduled_dagruns = (
               session.query(DR)
               .filter(DR.dag_id == self.dag_id)
               .filter(DR.run_id.notlike(f"{DagRunType.BACKFILL_JOB.value}__%"))
               .filter(DR.external_trigger == False)
               .filter(DR.state != State.SUCCESS)
               .order_by(desc(DR.execution_date))
               .all()
           )
   ```
   
   This way we only get DR that have yet to succeed (since we made an assumption that successful DRs are free from SLA check). 
   
   > With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1?
   
   We are checking whether these TIs **are violating SLAs**, not whether these TIs **are free from SLAs**, those are different checks (e.g., to check if a TI violates *expected_duration*, we compare the current duration with the SLA; to check if a TI is free from SLA violations, we assert on that the TI has finished within the *expected_duration*). To do so would require us adding another column to TI as well.
   
   I'm slightly inclined towards option 1 (probably need to remove the 100 fixed limit), but definitely open to other opinions. :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] houqp commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

Posted by GitBox <gi...@apache.org>.
houqp commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-626404118


   If we can add `.filter(DR.state != State.SUCCESS)` to the query filter list, then I am also in favor of option 1. It's simpler than 2 and shouldn't run into performance issue for majority of the use-cases.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org