You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/17 03:50:35 UTC

[GitHub] [airflow] tanelk commented on a diff in pull request #21871: Add map_index to pods launched by KubernetesExecutor

tanelk commented on code in PR #21871:
URL: https://github.com/apache/airflow/pull/21871#discussion_r851698524


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -452,50 +456,55 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
         self.log.debug("Clearing tasks that have not been launched")
         if not self.kube_client:
             raise AirflowException(NOT_STARTED_MESSAGE)
-        queued_tasks = session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
-        self.log.info('Found %s queued task instances', len(queued_tasks))
+        queued_tis: List[TaskInstance] = (
+            session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
+        )
+        self.log.info('Found %s queued task instances', len(queued_tis))
 
         # Go through the "last seen" dictionary and clean out old entries
         allowed_age = self.kube_config.worker_pods_queued_check_interval * 3
         for key, timestamp in list(self.last_handled.items()):
             if time.time() - timestamp > allowed_age:
                 del self.last_handled[key]
 
-        for task in queued_tasks:
-            self.log.debug("Checking task %s", task)
+        for ti in queued_tis:
+            self.log.debug("Checking task instance %s", ti)
 
             # Check to see if we've handled it ourselves recently
-            if task.key in self.last_handled:
+            if ti.key in self.last_handled:
                 continue
 
             # Build the pod selector
             base_label_selector = (
-                f"dag_id={pod_generator.make_safe_label_value(task.dag_id)},"
-                f"task_id={pod_generator.make_safe_label_value(task.task_id)},"
-                f"airflow-worker={pod_generator.make_safe_label_value(str(task.queued_by_job_id))}"
+                f"dag_id={pod_generator.make_safe_label_value(ti.dag_id)},"
+                f"task_id={pod_generator.make_safe_label_value(ti.task_id)},"
+                f"airflow-worker={pod_generator.make_safe_label_value(str(ti.queued_by_job_id))}"
             )
+            if ti.map_index >= 0:
+                # Old tasks _couldn't_ be mapped, so we don't have to worry about compat
+                base_label_selector += f',map_index={ti.map_index}'
             kwargs = dict(label_selector=base_label_selector)
             if self.kube_config.kube_client_request_args:
                 kwargs.update(**self.kube_config.kube_client_request_args)
 
             # Try run_id first
-            kwargs['label_selector'] += ',run_id=' + pod_generator.make_safe_label_value(task.run_id)
+            kwargs['label_selector'] += ',run_id=' + pod_generator.make_safe_label_value(ti.run_id)
             pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
             if pod_list.items:
                 continue
             # Fallback to old style of using execution_date
             kwargs['label_selector'] = (
                 f'{base_label_selector},'
-                f'execution_date={pod_generator.datetime_to_label_safe_datestring(task.execution_date)}'
+                f'execution_date={pod_generator.datetime_to_label_safe_datestring(ti.execution_date)}'
             )
             pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
             if pod_list.items:
                 continue
-            self.log.info('TaskInstance: %s found in queued state but was not launched, rescheduling', task)
+            self.log.info('TaskInstance: %s found in queued state but was not launched, rescheduling', ti)
             session.query(TaskInstance).filter(
-                TaskInstance.dag_id == task.dag_id,
-                TaskInstance.task_id == task.task_id,
-                TaskInstance.run_id == task.run_id,
+                TaskInstance.dag_id == ti.dag_id,
+                TaskInstance.task_id == ti.task_id,
+                TaskInstance.run_id == ti.run_id,
             ).update({TaskInstance.state: State.SCHEDULED})

Review Comment:
   @ashb , Doesn't this update statement need also `map_index` check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org