You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/03/04 17:12:59 UTC

[GitHub] [airflow] kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387810889
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
         return dags
 
     @provide_session
-    def kill_zombies(self, dagbag, zombies, session=None):
+    def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
         """
-        Fail given zombie tasks, which are tasks that haven't
-        had a heartbeat for too long, in the current DagBag.
+        Execute on failure callbacks. These objects can come from SchedulerJob or from
+        DagFileProcessorManager.
 
-        :param zombies: zombie task instances to kill.
-        :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+        :param failure_callback_requests: failure callbacks to execute
+        :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
         :param session: DB session.
         """
         TI = models.TaskInstance
 
-        for zombie in zombies:
-            if zombie.dag_id in dagbag.dags:
-                dag = dagbag.dags[zombie.dag_id]
-                if zombie.task_id in dag.task_ids:
-                    task = dag.get_task(zombie.task_id)
-                    ti = TI(task, zombie.execution_date)
+        for request in failure_callback_requests:
+            if request.simple_task_instance.dag_id in dagbag.dags:
+                dag = dagbag.dags[request.simple_task_instance.dag_id]
+                if request.simple_task_instance.task_id in dag.task_ids:
+                    task = dag.get_task(request.simple_task_instance.task_id)
+                    ti = TI(task, request.simple_task_instance.execution_date)
                     # Get properties needed for failure handling from SimpleTaskInstance.
-                    ti.start_date = zombie.start_date
-                    ti.end_date = zombie.end_date
-                    ti.try_number = zombie.try_number
-                    ti.state = zombie.state
+                    ti.start_date = request.simple_task_instance.start_date
+                    ti.end_date = request.simple_task_instance.end_date
+                    ti.try_number = request.simple_task_instance.try_number
+                    ti.state = request.simple_task_instance.state
                     ti.test_mode = self.UNIT_TEST_MODE
-                    ti.handle_failure("{} detected as zombie".format(ti),
-                                      ti.test_mode, ti.get_template_context())
-                    self.log.info('Marked zombie job %s as %s', ti, ti.state)
-                    Stats.incr('zombies_killed')
 
 Review comment:
   Needs mention in API and we need to remove `zombies_killed` from the metrics list

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services