You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/08 18:00:07 UTC

[GitHub] [airflow] kaxil opened a new pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

kaxil opened a new pull request #11358:
URL: https://github.com/apache/airflow/pull/11358


   start_date, end_date & duration were not set for tasks that were failing because the DagRun status was marked externally to some state.
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502411170



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       If it's finished, then won't it already have a duration?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502312132



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       Errors with the following, this will always be 0 so I think setting it to 0 here explicitly is fine. 
   
   ```
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) column "duration" is of type double precision but expression is of type interval
   LINE 1: ...09T09:37:10.897145+00:00'::timestamptz, duration=(task_insta...
                                                                ^
   HINT:  You will need to rewrite or cast the expression.
   
   [SQL: UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=(task_instance.end_date - task_instance.start_date), state=%(state)s FROM (SELECT task_instance.try_number AS try_number, task_instance.task_id AS task_id, task_instance.dag_id AS dag_id, task_instance.execution_date AS execution_date, task_instance.start_date AS start_date, task_instance.end_date AS end_date, task_instance.duration AS duration, task_instance.state AS state, task_instance.max_tries AS max_tries, task_instance.hostname AS hostname, task_instance.unixname AS unixname, task_instance.job_id AS job_id, task_instance.pool AS pool, task_instance.pool_slots AS pool_slots, task_instance.queue AS queue, task_instance.priority_weight AS priority_weight, task_instance.operator AS operator, task_instance.queued_dttm AS queued_dttm, task_instance.queued_by_job_id AS queued_by_job_id, task_instance.pid AS pid, task_instance.executor_config AS executor_config, task_instance.external_executor
 _id AS external_executor_id
   FROM task_instance LEFT OUTER JOIN dag_run ON task_instance.dag_id = dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date
   WHERE task_instance.dag_id IN (%(dag_id_1)s) AND task_instance.state IN (%(state_1)s) AND (dag_run.state != %(state_2)s OR dag_run.state IS NULL)) AS anon_1 WHERE task_instance.dag_id = anon_1.dag_id AND task_instance.task_id = anon_1.task_id AND task_instance.execution_date = anon_1.execution_date]
   [parameters: {'start_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'state': 'failed', 'dag_id_1': 'test_execute_helper_should_change_state_for_tis_without_dagrun', 'state_1': 'up_for_retry', 'state_2': 'running'}]
   (Background on this error at: http://sqlalche.me/e/13/f405)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502007626



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       >This doesn't feel right in the case of Sensing or Up-for-reschedule tasks. Hmmm
   
   That is true, we should only set end_date and duration when the new TI state is either in one of "failed", "success" or "skipped". fixed in https://github.com/apache/airflow/pull/11358/commits/3b0d977d413b38e6b394287a6cbc85a6bc25b414
   
   >We can let the DB give us the time, no need to do a utcnow() this way.
   
   I did that to have parity between MySQL, Sqlite and Postgres as for MySQL and Sqlite `ti.set_state` is called which also uses `timezone.utcnow()`
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502394258



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       We won't be setting 0 for Sensing, only for TIs in `State.finished()` which is failed, success or skipped




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #11358:
URL: https://github.com/apache/airflow/pull/11358


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r501980867



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       This doesn't feel right in the case of Sensing or Up-for-reschedule tasks. Hmmm
   
   Maybe we want:
   
   ```suggestion
                       models.TaskInstance.state: new_state,
                       models.TaskInstance.start_date: func.coalesce(models.TaskInstance.start_date, current_time),
                       models.TaskInstance.end_date: func.coalesce(models.TaskInstance.end_date, current_time),
                       models.TaskInstance.duration: models.TaskInstance.end_date - models.TaskInstance.start_date,
   ```
   
   (I'm not sure if that last line works right -- i.e. does it use the pre-update values or post-update.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502312132



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       Errors with the following, this will always be 0 so I think setting it to 0 here explicitly is fine compared to casting it to float 
   
   ```
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) column "duration" is of type double precision but expression is of type interval
   LINE 1: ...09T09:37:10.897145+00:00'::timestamptz, duration=(task_insta...
                                                                ^
   HINT:  You will need to rewrite or cast the expression.
   
   [SQL: UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=(task_instance.end_date - task_instance.start_date), state=%(state)s FROM (SELECT task_instance.try_number AS try_number, task_instance.task_id AS task_id, task_instance.dag_id AS dag_id, task_instance.execution_date AS execution_date, task_instance.start_date AS start_date, task_instance.end_date AS end_date, task_instance.duration AS duration, task_instance.state AS state, task_instance.max_tries AS max_tries, task_instance.hostname AS hostname, task_instance.unixname AS unixname, task_instance.job_id AS job_id, task_instance.pool AS pool, task_instance.pool_slots AS pool_slots, task_instance.queue AS queue, task_instance.priority_weight AS priority_weight, task_instance.operator AS operator, task_instance.queued_dttm AS queued_dttm, task_instance.queued_by_job_id AS queued_by_job_id, task_instance.pid AS pid, task_instance.executor_config AS executor_config, task_instance.external_executor
 _id AS external_executor_id
   FROM task_instance LEFT OUTER JOIN dag_run ON task_instance.dag_id = dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date
   WHERE task_instance.dag_id IN (%(dag_id_1)s) AND task_instance.state IN (%(state_1)s) AND (dag_run.state != %(state_2)s OR dag_run.state IS NULL)) AS anon_1 WHERE task_instance.dag_id = anon_1.dag_id AND task_instance.task_id = anon_1.task_id AND task_instance.execution_date = anon_1.execution_date]
   [parameters: {'start_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'state': 'failed', 'dag_id_1': 'test_execute_helper_should_change_state_for_tis_without_dagrun', 'state_1': 'up_for_retry', 'state_2': 'running'}]
   (Background on this error at: http://sqlalche.me/e/13/f405)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502394258



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       We won't be setting 0 for Sensing, only for TIs in `State.finished()`:




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502458350



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       Ahhh, right yes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r501980867



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       This doesn't feel right in the case of Sensing or Up-for-reschedule tasks. Hmmm
   
   Maybe we want:
   
   ```suggestion
                       models.TaskInstance.state: new_state,
                       models.TaskInstance.start_date: func.coalesce(models.TaskInstance.start_date, current_time),
                       models.TaskInstance.end_date: func.coalesce(models.TaskInstance.end_date, current_time),
                       models.TaskInstance.duration: models.TaskInstance.end_date - models.TaskInstance.start_date,
   ```
   
   (I'm not sure if that last line works right -- i.e. does it use the pre-update values or post-update.)

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       This doesn't feel right in the case of Sensing or Up-for-reschedule tasks. Hmmm
   
   Maybe we want:
   
   ```suggestion
                       models.TaskInstance.state: new_state,
                       models.TaskInstance.start_date: func.coalesce(models.TaskInstance.start_date, func.now()),
                       models.TaskInstance.end_date: func.coalesce(models.TaskInstance.end_date, func.now()),
                       models.TaskInstance.duration: models.TaskInstance.end_date - models.TaskInstance.start_date,
   ```
   
   (I'm not sure if that last line works right -- i.e. does it use the pre-update values or post-update.)
   
   We can let the DB give us the time, no need to do a `utcnow()` this way.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0

Review comment:
       Shouldn't this be in ti.duration?

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       ```suggestion
                       models.TaskInstance.duration: models.TaskInstance.end_date - models.TaskInstance.start_date,
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       If the task is in sensing state it won't be 0 is my thought.
   
   But this is not trivial to do portably it turns out: On postgres this is `EXTRACT(EPOCH from end_date - start_date)` -- likely not worth it.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       If it's finished, then won't it already have a duration?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r501980867



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       This doesn't feel right in the case of Sensing or Up-for-reschedule tasks. Hmmm
   
   Maybe we want:
   
   ```suggestion
                       models.TaskInstance.state: new_state,
                       models.TaskInstance.start_date: func.coalesce(models.TaskInstance.start_date, func.now()),
                       models.TaskInstance.end_date: func.coalesce(models.TaskInstance.end_date, func.now()),
                       models.TaskInstance.duration: models.TaskInstance.end_date - models.TaskInstance.start_date,
   ```
   
   (I'm not sure if that last line works right -- i.e. does it use the pre-update values or post-update.)
   
   We can let the DB give us the time, no need to do a `utcnow()` this way.

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0

Review comment:
       Shouldn't this be in ti.duration?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502006285



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0

Review comment:
       in `ti.set_state` right? fixed in https://github.com/apache/airflow/pull/11358/commits/3b0d977d413b38e6b394287a6cbc85a6bc25b414

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
             tis_changed = session \
                 .query(models.TaskInstance) \
                 .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
                     subq.c.execution_date) \
-                .update({models.TaskInstance.state: new_state}, synchronize_session=False)
+                .update({
+                    models.TaskInstance.state: new_state,
+                    models.TaskInstance.start_date: current_time,
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       >This doesn't feel right in the case of Sensing or Up-for-reschedule tasks. Hmmm
   
   That is true, we should only set end_date and duration when the new TI state is either in one of "failed", "success" or "skipped". fixed in https://github.com/apache/airflow/pull/11358/commits/3b0d977d413b38e6b394287a6cbc85a6bc25b414
   
   >We can let the DB give us the time, no need to do a utcnow() this way.
   
   I did that to have parity between MySQL, Sqlite and Postgres as for MySQL and Sqlite `ti.set_state` is called which also uses `timezone.utcnow()`
   

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       Errors with the following, this will always be 0 so I think setting it to 0 here explicitly is fine. 
   
   ```
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) column "duration" is of type double precision but expression is of type interval
   LINE 1: ...09T09:37:10.897145+00:00'::timestamptz, duration=(task_insta...
                                                                ^
   HINT:  You will need to rewrite or cast the expression.
   
   [SQL: UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=(task_instance.end_date - task_instance.start_date), state=%(state)s FROM (SELECT task_instance.try_number AS try_number, task_instance.task_id AS task_id, task_instance.dag_id AS dag_id, task_instance.execution_date AS execution_date, task_instance.start_date AS start_date, task_instance.end_date AS end_date, task_instance.duration AS duration, task_instance.state AS state, task_instance.max_tries AS max_tries, task_instance.hostname AS hostname, task_instance.unixname AS unixname, task_instance.job_id AS job_id, task_instance.pool AS pool, task_instance.pool_slots AS pool_slots, task_instance.queue AS queue, task_instance.priority_weight AS priority_weight, task_instance.operator AS operator, task_instance.queued_dttm AS queued_dttm, task_instance.queued_by_job_id AS queued_by_job_id, task_instance.pid AS pid, task_instance.executor_config AS executor_config, task_instance.external_executor
 _id AS external_executor_id
   FROM task_instance LEFT OUTER JOIN dag_run ON task_instance.dag_id = dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date
   WHERE task_instance.dag_id IN (%(dag_id_1)s) AND task_instance.state IN (%(state_1)s) AND (dag_run.state != %(state_2)s OR dag_run.state IS NULL)) AS anon_1 WHERE task_instance.dag_id = anon_1.dag_id AND task_instance.task_id = anon_1.task_id AND task_instance.execution_date = anon_1.execution_date]
   [parameters: {'start_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'state': 'failed', 'dag_id_1': 'test_execute_helper_should_change_state_for_tis_without_dagrun', 'state_1': 'up_for_retry', 'state_2': 'running'}]
   (Background on this error at: http://sqlalche.me/e/13/f405)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       Errors with the following, this will always be 0 so I think setting it to 0 here explicitly is fine compared to casting it to float 
   
   ```
       cursor.execute(statement, parameters)
   sqlalchemy.exc.ProgrammingError: (psycopg2.errors.DatatypeMismatch) column "duration" is of type double precision but expression is of type interval
   LINE 1: ...09T09:37:10.897145+00:00'::timestamptz, duration=(task_insta...
                                                                ^
   HINT:  You will need to rewrite or cast the expression.
   
   [SQL: UPDATE task_instance SET start_date=%(start_date)s, end_date=%(end_date)s, duration=(task_instance.end_date - task_instance.start_date), state=%(state)s FROM (SELECT task_instance.try_number AS try_number, task_instance.task_id AS task_id, task_instance.dag_id AS dag_id, task_instance.execution_date AS execution_date, task_instance.start_date AS start_date, task_instance.end_date AS end_date, task_instance.duration AS duration, task_instance.state AS state, task_instance.max_tries AS max_tries, task_instance.hostname AS hostname, task_instance.unixname AS unixname, task_instance.job_id AS job_id, task_instance.pool AS pool, task_instance.pool_slots AS pool_slots, task_instance.queue AS queue, task_instance.priority_weight AS priority_weight, task_instance.operator AS operator, task_instance.queued_dttm AS queued_dttm, task_instance.queued_by_job_id AS queued_by_job_id, task_instance.pid AS pid, task_instance.executor_config AS executor_config, task_instance.external_executor
 _id AS external_executor_id
   FROM task_instance LEFT OUTER JOIN dag_run ON task_instance.dag_id = dag_run.dag_id AND task_instance.execution_date = dag_run.execution_date
   WHERE task_instance.dag_id IN (%(dag_id_1)s) AND task_instance.state IN (%(state_1)s) AND (dag_run.state != %(state_2)s OR dag_run.state IS NULL)) AS anon_1 WHERE task_instance.dag_id = anon_1.dag_id AND task_instance.task_id = anon_1.task_id AND task_instance.execution_date = anon_1.execution_date]
   [parameters: {'start_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'end_date': datetime.datetime(2020, 10, 9, 9, 37, 10, 897145, tzinfo=Timezone('UTC')), 'state': 'failed', 'dag_id_1': 'test_execute_helper_should_change_state_for_tis_without_dagrun', 'state_1': 'up_for_retry', 'state_2': 'running'}]
   (Background on this error at: http://sqlalche.me/e/13/f405)
   ```

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       We won't be setting 0 for Sensing, only for TIs in `State.finished()`:

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       We won't be setting 0 for Sensing, only for TIs in `State.finished()` which is failed, success or skipped

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       it is the `new_state` i.e which state we want to set it to, check the following code block.
   
   https://github.com/apache/airflow/blob/3b0d977d413b38e6b394287a6cbc85a6bc25b414/airflow/jobs/scheduler_job.py#L1787-L1802
   
   So we only want to set the end_date and duration when we are setting the new_state to Failed and not when we are setting it to `None` since that task will be run again

##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       I was not clear before, it is the `new_state` i.e which state we want to set it to, check the following code block.
   
   https://github.com/apache/airflow/blob/3b0d977d413b38e6b394287a6cbc85a6bc25b414/airflow/jobs/scheduler_job.py#L1787-L1802
   
   So we only want to set the end_date and duration when we are setting the new_state to Failed and not when we are setting it to `None` since that task will be run again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502393539



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       If the task is in sensing state it won't be 0 is my thought.
   
   But this is not trivial to do portably it turns out: On postgres this is `EXTRACT(EPOCH from end_date - start_date)` -- likely not worth it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502414466



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       I was not clear before, it is the `new_state` i.e which state we want to set it to, check the following code block.
   
   https://github.com/apache/airflow/blob/3b0d977d413b38e6b394287a6cbc85a6bc25b414/airflow/jobs/scheduler_job.py#L1787-L1802
   
   So we only want to set the end_date and duration when we are setting the new_state to Failed and not when we are setting it to `None` since that task will be run again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502267226



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       ```suggestion
                       models.TaskInstance.duration: models.TaskInstance.end_date - models.TaskInstance.start_date,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502414466



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1162,14 +1162,27 @@ def _change_state_for_tis_without_dagrun(
                 tis_changed += 1
         else:
             subq = query.subquery()
+            current_time = timezone.utcnow()
+            ti_prop_update = {
+                models.TaskInstance.state: new_state,
+                models.TaskInstance.start_date: current_time,
+            }
+
+            # Only add end_date and duration if the new_state is 'success', 'failed' or 'skipped'
+            if new_state in State.finished():
+                ti_prop_update.update({
+                    models.TaskInstance.end_date: current_time,
+                    models.TaskInstance.duration: 0,

Review comment:
       it is the `new_state` i.e which state we want to set it to, check the following code block.
   
   https://github.com/apache/airflow/blob/3b0d977d413b38e6b394287a6cbc85a6bc25b414/airflow/jobs/scheduler_job.py#L1787-L1802
   
   So we only want to set the end_date and duration when we are setting the new_state to Failed and not when we are setting it to `None` since that task will be run again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #11358: Set start_date, end_date & duration for tasks failing without DagRun

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #11358:
URL: https://github.com/apache/airflow/pull/11358#discussion_r502006285



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1159,17 +1159,24 @@ def _change_state_for_tis_without_dagrun(
             tis_to_change: List[TI] = query.with_for_update().all()
             for ti in tis_to_change:
                 ti.set_state(new_state, session=session)
+                ti.duration = 0

Review comment:
       in `ti.set_state` right? fixed in https://github.com/apache/airflow/pull/11358/commits/3b0d977d413b38e6b394287a6cbc85a6bc25b414




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org