You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/27 08:37:56 UTC

[GitHub] [airflow] Acehaidrey opened a new pull request #9544: Add metric for scheduling delay between first run task & expected start time

Acehaidrey opened a new pull request #9544:
URL: https://github.com/apache/airflow/pull/9544


           This is to emit the true scheduling delay stats, which is defined as the time when the first
           task in DAG starts minus the expected DAG run datetime. This method will be used in the update_state method
           when the state of the DagRun is updated to a completed status (either success or failure).
           The method will find the first started task within the DAG and calculate the expected DagRun start time (
           based on dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
   
           The emitted data may contains outlier (e.g. when the first task was cleared, so the second task's start_date
           will be used), but we can get ride of the the outliers on the stats side through the dashboards.
   
           Note, the stat will only be emitted if the DagRun is a scheduler triggered one (i.e. external_trigger is False).
   
   
   This is all useful when you want to even measure executor delays, considerations for adding more capacity, adding more scheduling processes, etc. Measuring just the delay in the dagrun from its start time to expected time is just one of the measurement, but measuring from the dag run to its first real running job is important as well.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-669723733


   @mik-laj thank you for all the help. I will get on this next week. To be honest had some family loss due to covid and have been away for 3 weeks. Want to close this out.
   
   Just to be sure - adding the tests you have provided me, making sure the filter just gets the single value and not returns the full record. Those are the 2 action items - is there anytihng else?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r452511482



##########
File path: airflow/models/dagrun.py
##########
@@ -411,6 +412,44 @@ def _are_premature_tis(
                 return True
         return False
 
+    @provide_session
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, session=None):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
+
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get ride of the the outliers
+        on the stats side through the dashboards.
+
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        if self.state == State.RUNNING:
+            return
+
+        try:
+            if self.external_trigger:
+                return
+            # Get the task that has the earliest start_date
+            qry = session.query(TI).filter(

Review comment:
       I just need the actual start time, so I can filter down even more on the cardinality. will add that improvement, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-701907625


   Hi all - apologies. I am back now, and will complete these updates this weekend to get it in. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] stale[bot] commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-695759675


   This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey edited a comment on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey edited a comment on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-725882481


   Hey all sorry for all the delays. Working on it currently. Thanks for bearing with me! Seeing that we can use the `TISchedulingDecision` object, and don't need to make another db call, so great addition and this pr reflects that! Please review when you have a chance (waiting for build first)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-705518977


   @Acehaidrey is there anything we can do to help you with finishing this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-653242280


   @mik-laj please when you get a chance


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r446847248



##########
File path: airflow/models/dagrun.py
##########
@@ -411,6 +412,44 @@ def _are_premature_tis(
                 return True
         return False
 
+    @provide_session
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, session=None):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
+
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get ride of the the outliers
+        on the stats side through the dashboards.
+
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        if self.state == State.RUNNING:
+            return
+
+        try:
+            if self.external_trigger:
+                return
+            # Get the task that has the earliest start_date
+            qry = session.query(TI).filter(
+                TI.dag_id == self.dag_id,
+                TI.execution_date == self.execution_date,
+                TI.start_date.isnot(None),
+            ).order_by(TI.start_date.asc())
+            ti = qry.first()
+            dag = self.get_dag()
+            if ti and dag:

Review comment:
       Would it be possible to get the date from `dag.tasks` without additional query?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r522836502



##########
File path: airflow/models/dagrun.py
##########
@@ -565,6 +566,40 @@ def _are_premature_tis(
                 return True
         return False
 
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, finished_tis):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two values to get the delay.
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get rid of the the outliers
+        on the stats side through the dashboards tooling built.
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        try:
+            if self.state == State.RUNNING:
+                return
+            if self.external_trigger:
+                return
+            if not finished_tis:
+                return
+            dag = self.get_dag()
+            ordered_tis_by_start_date = [ti for ti in finished_tis if ti.start_date]
+            ordered_tis_by_start_date.sort(key=lambda ti: ti.start_date, reverse=False)
+            first_start_date = getattr(ordered_tis_by_start_date[0], 'start_date', None)

Review comment:
       I don't understand why `getattr` is used here. Can you tell me more about it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-726594295


   @ryw @potiuk @turbaszek (mik-laj I know you're busy so feel free to ignore and thanks for all your help). If you all get a chance to review this once more.Finally wrapped it up, and rehauled it
   
   Have aa few other PRs want to run by you all too after this!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r450636937



##########
File path: tests/models/test_dagrun.py
##########
@@ -642,3 +642,27 @@ def test_wait_for_downstream(self, prev_ti_state, is_ti_success):
         ti.set_state(State.QUEUED)
         ti.run()
         self.assertEqual(ti.state == State.SUCCESS, is_ti_success)
+
+    @mock.patch.object(DagRun, '_emit_true_scheduling_delay_stats_for_finished_state')
+    def test_dag_stats(self, stats_mock):
+        """
+        Tests that dag scheduling delay stat is called.
+        """
+        dag = DAG(
+            dag_id='test_dagrun_stats',
+            start_date=timezone.datetime(2017, 1, 1)
+        )
+        dag_task = DummyOperator(
+            task_id='dummy1',
+            dag=dag
+        )
+
+        initial_task_states = {
+            dag_task.task_id: State.SUCCESS,
+        }
+
+        dag_run = self.create_dag_run(dag=dag,
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        dag_run.update_state()
+        stats_mock.assert_called()

Review comment:
       Hi @turbaszek thank you for this, the question I had, and I wouldn't know if you know. The time value changes, do you know if I can do called_once_with,  and then the value be a random?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r447208601



##########
File path: airflow/models/dagrun.py
##########
@@ -411,6 +412,44 @@ def _are_premature_tis(
                 return True
         return False
 
+    @provide_session
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, session=None):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
+
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get ride of the the outliers
+        on the stats side through the dashboards.
+
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        if self.state == State.RUNNING:
+            return
+
+        try:
+            if self.external_trigger:
+                return
+            # Get the task that has the earliest start_date
+            qry = session.query(TI).filter(
+                TI.dag_id == self.dag_id,
+                TI.execution_date == self.execution_date,
+                TI.start_date.isnot(None),
+            ).order_by(TI.start_date.asc())
+            ti = qry.first()
+            dag = self.get_dag()
+            if ti and dag:

Review comment:
       If there is any way to get that information without doing the query I am all ears to make that change!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-726648542


   Cool! Some static checks are failing :).  I recommend pre-commit installation :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-700656427


   > Just to be sure - adding the tests you have provided me, making sure the filter just gets the single value and not returns the full record. Those are the 2 action items - is there anytihng else?
   
   It seems to me that I do not need to download new data from the database at all. You can get them from the function one level higher. Can you check it? If so, you only need to pass this data to your function. You will also not add new tests as the number of queries will not change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r447208060



##########
File path: airflow/models/dagrun.py
##########
@@ -411,6 +412,44 @@ def _are_premature_tis(
                 return True
         return False
 
+    @provide_session
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, session=None):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
+
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get ride of the the outliers
+        on the stats side through the dashboards.
+
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        if self.state == State.RUNNING:
+            return
+
+        try:
+            if self.external_trigger:
+                return
+            # Get the task that has the earliest start_date
+            qry = session.query(TI).filter(
+                TI.dag_id == self.dag_id,
+                TI.execution_date == self.execution_date,
+                TI.start_date.isnot(None),
+            ).order_by(TI.start_date.asc())
+            ti = qry.first()
+            dag = self.get_dag()
+            if ti and dag:

Review comment:
       so dag.tasks doesn't have the start_date that it actually began with. That is saved in the db, not the start_date that is assigned in the default args. Because the intent is to get the real start time that this begins with, vs the dagrun start time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-653705002


   sorry to keep pinging @ashb , if you get a chance


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-726658882


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-655818454


   Can you add this test case to avoid regression in number of queries?
   ```python
       @provide_session
       def test_process_dags_queries_count_after_finish_dag_run(self, session):
           with mock.patch.dict("os.environ", {
               "PERF_DAGS_COUNT": "3",
               "PERF_TASKS_COUNT": "20",
               "PERF_START_AGO": "1d",
               "PERF_SCHEDULE_INTERVAL": "16h",
               "PERF_SHAPE": "grid",
           }), conf_vars({
               ('scheduler', 'use_job_schedule'): 'True',
           }):
               dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
               processor = DagFileProcessor([], mock.MagicMock())
   
               # Create new DAG Runs
               with assert_queries_count(28):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
   
               # No new DAG Run
               with assert_queries_count(19):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
   
               session.query(TaskInstance).update({
                   "state": State.SUCCESS,
                   "start_date": timezone.utcnow(),
                   "end_date": timezone.utcnow(),
                   "duration": 0,
               })
   
               # Finish Dag Runs
               with assert_queries_count(19):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)
   
               # No new DAG Runs
               with assert_queries_count(7):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)    @provide_session
       def test_process_dags_queries_count_after_finish_dag_run(self, session):
           with mock.patch.dict("os.environ", {
               "PERF_DAGS_COUNT": "3",
               "PERF_TASKS_COUNT": "20",
               "PERF_START_AGO": "1d",
               "PERF_SCHEDULE_INTERVAL": "16h",
               "PERF_SHAPE": "grid",
           }), conf_vars({
               ('scheduler', 'use_job_schedule'): 'True',
           }):
               dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
               processor = DagFileProcessor([], mock.MagicMock())
   
               # Create new DAG Runs
               with assert_queries_count(28):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
   
               # No new DAG Run
               with assert_queries_count(19):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 3)
   
               session.query(TaskInstance).update({
                   "state": State.SUCCESS,
                   "start_date": timezone.utcnow(),
                   "end_date": timezone.utcnow(),
                   "duration": 0,
               })
   
               # Finish Dag Runs
               with assert_queries_count(19):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)
   
               # No new DAG Runs
               with assert_queries_count(7):
                   processor._process_dags(dagbag.dags.values())
   
               self.assertEqual(session.query(DagRun).count(), 3)
               self.assertEqual(session.query(DagRun).filter(DagRun.state == State.RUNNING).count(), 0)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-705518977


   @Acehaidrey is there anything we can do to help you with finishing this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-655812648


   I have the answer to the 3rd question. We only test cases when DAG Run are still running.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-652644579


   @mik-laj can you please take a look at this when you get a chance.
   Also @ashb if you have time too


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-706434447


   Yeah, I'd love to mrege this one. I had ~ 1hr discussion today on similar metrics so it is needed :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-727024920


   Just updated again. Thank you for the comments. Got the fixes in 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r450659520



##########
File path: tests/models/test_dagrun.py
##########
@@ -642,3 +642,27 @@ def test_wait_for_downstream(self, prev_ti_state, is_ti_success):
         ti.set_state(State.QUEUED)
         ti.run()
         self.assertEqual(ti.state == State.SUCCESS, is_ti_success)
+
+    @mock.patch.object(DagRun, '_emit_true_scheduling_delay_stats_for_finished_state')
+    def test_dag_stats(self, stats_mock):
+        """
+        Tests that dag scheduling delay stat is called.
+        """
+        dag = DAG(
+            dag_id='test_dagrun_stats',
+            start_date=timezone.datetime(2017, 1, 1)
+        )
+        dag_task = DummyOperator(
+            task_id='dummy1',
+            dag=dag
+        )
+
+        initial_task_states = {
+            dag_task.task_id: State.SUCCESS,
+        }
+
+        dag_run = self.create_dag_run(dag=dag,
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        dag_run.update_state()
+        stats_mock.assert_called()

Review comment:
       You either use `mock.Any` as this argument or mock more methods so the value will be deterministic. In this case I'm ok with the first approach 😉 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r451883526



##########
File path: airflow/models/dagrun.py
##########
@@ -411,6 +412,44 @@ def _are_premature_tis(
                 return True
         return False
 
+    @provide_session
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, session=None):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
+
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get ride of the the outliers
+        on the stats side through the dashboards.
+
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        if self.state == State.RUNNING:
+            return
+
+        try:
+            if self.external_trigger:
+                return
+            # Get the task that has the earliest start_date
+            qry = session.query(TI).filter(

Review comment:
       ```suggestion
               qry = session.query(TI.start_date).filter(
   ```
   Do you need all attributes?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] stale[bot] closed pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
stale[bot] closed pull request #9544:
URL: https://github.com/apache/airflow/pull/9544


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-725882481


   Hey all sorry for all the delays. Working on it currently and will have the fixes in by tomorrow. Thanks for bearing with me! Seeing that we can use the `TISchedulingDecision` object.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kosteev commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
kosteev commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-707172392


   Hi guys!
   What do you think about emitting this metric in case when DAG has multiple first-task instances, e.g.: 100 tasks that do not have dependencies between each other, so they execute all in parallel.
   
   In my opinion, would be great if we can emit metric for each such task instance broken down by `task_id`. Otherwise this metric would be a bit misleading for these DAGs, and if we emit this metric for all first-task instances we could build max/p95/avg/... aggregations to monitor scheduler delays.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r452392682



##########
File path: airflow/models/dagrun.py
##########
@@ -411,6 +412,44 @@ def _are_premature_tis(
                 return True
         return False
 
+    @provide_session
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, session=None):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
+
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get ride of the the outliers
+        on the stats side through the dashboards.
+
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        if self.state == State.RUNNING:
+            return
+
+        try:
+            if self.external_trigger:
+                return
+            # Get the task that has the earliest start_date
+            qry = session.query(TI).filter(
+                TI.dag_id == self.dag_id,
+                TI.execution_date == self.execution_date,
+                TI.start_date.isnot(None),
+            ).order_by(TI.start_date.asc())
+            ti = qry.first()
+            dag = self.get_dag()
+            if ti and dag:

Review comment:
       The function your call this from already has the TIsei think, so you could pass them in




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-669779699


   Sincere condolences @Acehaidrey 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-662178011


   @Acehaidrey Is everything okay? Do you need any help?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-650947603


   @jhtimmins @ashb @turbaszek mind taking a look when you get a chance


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #9544:
URL: https://github.com/apache/airflow/pull/9544


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ryw commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
ryw commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-717506105


   @Acehaidrey would you be able to finish this PR, or would you like someone to take it over?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-730341511


   @kaxil -> I think this one would be really nice to cherry-pick to 1.10.13. It's not big and I can cherry-pick it - WDYT ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-726648542


   Cool1 Some static checks are failing :).  I recommend pre-commit installation :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] Acehaidrey commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
Acehaidrey commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r452512724



##########
File path: airflow/models/dagrun.py
##########
@@ -411,6 +412,44 @@ def _are_premature_tis(
                 return True
         return False
 
+    @provide_session
+    def _emit_true_scheduling_delay_stats_for_finished_state(self, session=None):
+        """
+        This is a helper method to emit the true scheduling delay stats, which is defined as
+        the time when the first task in DAG starts minus the expected DAG run datetime.
+        This method will be used in the update_state method when the state of the DagRun
+        is updated to a completed status (either success or failure). The method will find the first
+        started task within the DAG and calculate the expected DagRun start time (based on
+        dag.execution_date & dag.schedule_interval), and minus these two to get the delay.
+
+        The emitted data may contains outlier (e.g. when the first task was cleared, so
+        the second task's start_date will be used), but we can get ride of the the outliers
+        on the stats side through the dashboards.
+
+        Note, the stat will only be emitted if the DagRun is a scheduler triggered one
+        (i.e. external_trigger is False).
+        """
+        if self.state == State.RUNNING:
+            return
+
+        try:
+            if self.external_trigger:
+                return
+            # Get the task that has the earliest start_date
+            qry = session.query(TI.start_date).filter(
+                TI.dag_id == self.dag_id,
+                TI.execution_date == self.execution_date,
+                TI.start_date.isnot(None),
+            ).order_by(TI.start_date.asc())
+            ti = qry.first()
+            dag = self.get_dag()
+            if ti and dag:
+                true_delay = (ti.start_date - dag.following_schedule(self.execution_date)).total_seconds()

Review comment:
       need to make changes here to support the change jsut commited with @mik-laj 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#issuecomment-655809178


   I started looking at this change and I have three questions.
   1. What is your average difference in value between `dagrun.schedule_delay.<dag_id>` and `dagrun.<dag_id>.first_task_scheduling_delay`?
   1. From what I see, you fetch one task with one query. Have you tried to avoid it? It seems to me that you can have this data in your memory. Please look at: airflow/models/dagrun.py:295 (update_state method)
   1. Do you know why this change does not affect jobs.test_scheduler_job.TestDagFileProcessorQueriesCount? It seems to me that there should be a visible problem with too many queries, but for some reason this is not visible. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9544: Add metric for scheduling delay between first run task & expected start time

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9544:
URL: https://github.com/apache/airflow/pull/9544#discussion_r449756656



##########
File path: tests/models/test_dagrun.py
##########
@@ -642,3 +642,27 @@ def test_wait_for_downstream(self, prev_ti_state, is_ti_success):
         ti.set_state(State.QUEUED)
         ti.run()
         self.assertEqual(ti.state == State.SUCCESS, is_ti_success)
+
+    @mock.patch.object(DagRun, '_emit_true_scheduling_delay_stats_for_finished_state')
+    def test_dag_stats(self, stats_mock):
+        """
+        Tests that dag scheduling delay stat is called.
+        """
+        dag = DAG(
+            dag_id='test_dagrun_stats',
+            start_date=timezone.datetime(2017, 1, 1)
+        )
+        dag_task = DummyOperator(
+            task_id='dummy1',
+            dag=dag
+        )
+
+        initial_task_states = {
+            dag_task.task_id: State.SUCCESS,
+        }
+
+        dag_run = self.create_dag_run(dag=dag,
+                                      state=State.RUNNING,
+                                      task_states=initial_task_states)
+        dag_run.update_state()
+        stats_mock.assert_called()

Review comment:
       Can you please use `assert_called_once_with`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org