You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/19 17:20:31 UTC

[GitHub] [airflow] RNHTTR opened a new pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

RNHTTR opened a new pull request #14321:
URL: https://github.com/apache/airflow/pull/14321


   Task instances would not be terminated when dagrun_timeout is exceeded which allowed new DAG runs to be created, but task instances within the new DAGs would never be instantiated. As alluded to in #13407, the query to determine if a DAG is active is based on [task instances being unfinished](https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L1502-L1512). Since task instances were not being set to a `State.finished` state when their DAG run was set to `FAILED` state, the scheduler continued to pick up the DAG as `RUNNING`, and tasks in new DAG runs would never be scheduled.
   
   closes: #12912
   related: #13407
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #14321:
URL: https://github.com/apache/airflow/pull/14321


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r580978156



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       I wonder what was the old behaviour in 1.10.x
   
   We should add a test for it too.
   
   cc @ashb 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r582006481



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       I believe setting the task instance state to failed will have the same effect. It just has to be a state in `State.finished`. I chose skipped, because I figured the tasks didn’t even get a chance to run. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584373939



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       Test seems to be successful with a 1 second sleep. Pushing the update now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584217159



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       @kaxil Would it be bad to use `time.sleep` to force a timeout in the test? I'm not sure there's a way to otherwise indicate a dag run has timed out.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r580978156



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       I wonder what was the old behaviour in 1.10.x -- skip vs failed
   
   We should add a test for it too.
   
   cc @ashb 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#issuecomment-782758937


   It seems odd to me that this failed -- the only logs for the failure indicate a timeout in the CI? Unless I'm missing something which is highly possible. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r585066585



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       @kaxil i think the CI failed due to a transient error again...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584217795



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       E.g:
   
   ``` python
   def test_do_schedule_max_active_runs_dag_timed_out(self):
       """Test that tasks are set to a finished state when their DAG times out"""
       dag = DAG(
           dag_id='test_max_active_run_with_dag_timed_out',
           start_date=DEFAULT_DATE,
           schedule_interval='@once',
           max_active_runs=1,
           catchup=True,
       )
       # Arbitrary timeout long enough to allow run2 to instantiate and queue a task
       dag.dagrun_timeout = datetime.timedelta(seconds=10)
   
       with dag:
           task1 = BashOperator(
               task_id='task1',
               bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i";  done',
           )
   
       session = settings.Session()
       dagbag = DagBag(
           dag_folder=os.devnull,
           include_examples=False,
           read_dags_from_db=True,
       )
       
       dagbag.bag_dag(dag=dag, root_dag=dag)
       dagbag.sync_to_db(session=session)
   
       run1 = dag.create_dagrun(
           run_type=DagRunType.SCHEDULED,
           execution_date=DEFAULT_DATE,
           state=State.RUNNING,
           session=session,
       )
   
       sleep(10)
   
       run2 = dag.create_dagrun(
           run_type=DagRunType.SCHEDULED,
           execution_date=DEFAULT_DATE + timedelta(hours=1),
           state=State.RUNNING,
           session=session,
       )
   
       dag.sync_to_db(session=session)
   
       job = SchedulerJob(subdir=os.devnull)
       job.executor = MockExecutor(do_update=False)
       job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
   
       num_queued = job._do_scheduling(session)
   
       assert num_queued == 1
       ti = run2.get_task_instance(task1.task_id, session)
       assert ti.state == State.QUEUED    
       ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#issuecomment-783491002


   > It seems odd to me that this failed -- the only logs for the failure indicate a timeout in the CI? Unless I'm missing something which is highly possible.
   
   I have re-run the tests again -- it might be a one-off failure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#issuecomment-783650434


   Looks like that was the case :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584217795



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       E.g:
   
   ``` python
   def test_do_schedule_max_active_runs_dag_timed_out(self):
       """Test that tasks are set to a finished state when their DAG times out"""
   
       dag = DAG(
           dag_id='test_max_active_run_with_dag_timed_out',
           start_date=DEFAULT_DATE,
           schedule_interval='@once',
           max_active_runs=1,
           catchup=True,
       )
       dag.dagrun_timeout = datetime.timedelta(seconds=1)
   
       with dag:
           task1 = BashOperator(
               task_id='task1',
               bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i";  done',
           )
   
       session = settings.Session()
       dagbag = DagBag(
           dag_folder=os.devnull,
           include_examples=False,
           read_dags_from_db=True,
       )
       
       dagbag.bag_dag(dag=dag, root_dag=dag)
       dagbag.sync_to_db(session=session)
   
       run1 = dag.create_dagrun(
           run_type=DagRunType.SCHEDULED,
           execution_date=DEFAULT_DATE,
           state=State.RUNNING,
           session=session,
       )
       run1_ti = run1.get_task_instance(task1.task_id, session)
       run1_ti.state = State.RUNNING
   
       sleep(1)
   
       run2 = dag.create_dagrun(
           run_type=DagRunType.SCHEDULED,
           execution_date=DEFAULT_DATE + timedelta(seconds=10),
           state=State.RUNNING,
           session=session,
       )
   
       dag.sync_to_db(session=session)
   
       job = SchedulerJob(subdir=os.devnull)
       job.executor = MockExecutor()
       job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
   
       num_queued = job._do_scheduling(session)
   
       assert run1.state == State.FAILED
       assert run1_ti.state == State.SKIPPED
       assert run2.state == State.RUNNING
   
       num_queued = job._do_scheduling(session)
       run2_ti = run2.get_task_instance(task1.task_id, session)
       assert run2_ti.state == State.QUEUED    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584217159



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       @kaxil Would it be bad to use `time.sleep` to force a timeout in the test?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584126071



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       Sure thing!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#issuecomment-791059325


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r582042857



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       Sounds good, can you add a test please to cover this case




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584373939



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       Test seems to be successful with a 1 second sleep. Just pushed a commit including a test for this scenario




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RNHTTR commented on a change in pull request #14321: Fix bug allowing task instances to survive when dagrun_timeout is exceeded

Posted by GitBox <gi...@apache.org>.
RNHTTR commented on a change in pull request #14321:
URL: https://github.com/apache/airflow/pull/14321#discussion_r584217795



##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1696,10 +1696,18 @@ def _schedule_dag_run(
             and dag.dagrun_timeout
             and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout
         ):
-            dag_run.state = State.FAILED
-            dag_run.end_date = timezone.utcnow()
-            self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
+            dag_run.set_state(State.FAILED)
+            unfinished_task_instances = (
+                session.query(TI)
+                .filter(TI.dag_id == dag_run.dag_id)
+                .filter(TI.execution_date == dag_run.execution_date)
+                .filter(TI.state.in_(State.unfinished))
+            )
+            for task_instance in unfinished_task_instances:
+                task_instance.state = State.SKIPPED

Review comment:
       E.g:
   
   ``` python
   def test_do_schedule_max_active_runs_dag_timed_out(self):
       """Test that tasks are set to a finished state when their DAG times out"""
       dag = DAG(
           dag_id='test_max_active_run_with_dag_timed_out',
           start_date=DEFAULT_DATE,
           schedule_interval='@once',
           max_active_runs=1,
           catchup=True,
       )
       # Arbitrary timeout long enough to allow run2 to instantiate and queue a task
       dag.dagrun_timeout = datetime.timedelta(seconds=10)
   
       with dag:
           task1 = BashOperator(
               task_id='task1',
               bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i";  done',
           )
   
       session = settings.Session()
       dagbag = DagBag(
           dag_folder=os.devnull,
           include_examples=False,
           read_dags_from_db=True,
       )
       
       dagbag.bag_dag(dag=dag, root_dag=dag)
       dagbag.sync_to_db(session=session)
   
       run1 = dag.create_dagrun(
           run_type=DagRunType.SCHEDULED,
           execution_date=DEFAULT_DATE,
           state=State.RUNNING,
           session=session,
       )
   
       run2 = dag.create_dagrun(
           run_type=DagRunType.SCHEDULED,
           execution_date=DEFAULT_DATE + timedelta(hours=1),
           state=State.RUNNING,
           session=session,
       )
   
       sleep(10)
   
       dag.sync_to_db(session=session)
   
       job = SchedulerJob(subdir=os.devnull)
       job.executor = MockExecutor(do_update=False)
       job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
   
       num_queued = job._do_scheduling(session)
   
       assert num_queued == 1
       ti = run2.get_task_instance(task1.task_id, session)
       assert ti.state == State.QUEUED    
       ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org