You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/17 21:47:52 UTC

[airflow] 01/06: Run mini scheduler in LocalTaskJob during task exit (#16289)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 970c6a2af420156c51137772e138d4f6e7313360
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Thu Jun 10 14:29:30 2021 +0100

    Run mini scheduler in LocalTaskJob during task exit (#16289)
    
    Currently, the chances of tasks being killed by the LocalTaskJob heartbeat is high.
    
    This is because, after marking a task successful/failed in Taskinstance.py and mini scheduler is enabled,
    we start running the mini scheduler. Whenever the mini scheduling takes time and meet the next job heartbeat,
    the heartbeat detects that this task has succeeded with no return code because LocalTaskJob.handle_task_exit
    was not called after the task succeeded. Hence, the heartbeat thinks that this task was externally marked failed/successful.
    
    This change resolves this by moving the mini scheduler to LocalTaskJob at the handle_task_exit method ensuring
    that the task will no longer be killed by the next heartbeat
    
    (cherry picked from commit 408bd26c22913af93d05aa70abc3c66c52cd4588)
---
 tests/jobs/test_local_task_job.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py
index 14c74ce..060bce8 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -842,12 +842,12 @@ class TestLocalTaskJob:
             op1 = PythonOperator(task_id='dummy', python_callable=lambda: True)
 
         session = settings.Session()
-        dag_maker.make_dagmodel(
-            has_task_concurrency_limits=False,
-            next_dagrun_create_after=dag.following_schedule(DEFAULT_DATE),
-            is_active=True,
-            is_paused=True,
-        )
+        dagmodel = dag_maker.dag_model
+        dagmodel.next_dagrun_create_after = dag.following_schedule(DEFAULT_DATE)
+        dagmodel.is_paused = True
+        session.merge(dagmodel)
+        session.flush()
+
         # Write Dag to DB
         dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
         dagbag.bag_dag(dag, root_dag=dag)