You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/29 17:32:19 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #19572: Simplify KubernetesPodOperator

jedcunningham commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r758523268



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?

Review comment:
       That's already done in `_get_pod_identifying_label_string`.

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels {label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")
+            self._compare_try_numbers(context, pod)
+
+            # In case of failed pods, should reattach the first time, but only once
+            # as the task will have already failed.
+            if not pod.metadata.labels.get(self.POD_CHECKED_KEY):
+                return pod

Review comment:
       ```suggestion
               return pod
   ```
   
   We already exclude the `already_checked` pods, so I don't think this conditional does anything any longer?

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels {label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")

Review comment:
       ```suggestion
               self.log.info("Found matching pod %s", pod.metadata.name)
   ```

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -51,10 +55,23 @@ def should_retry_start_pod(exception: Exception) -> bool:
 class PodStatus:
     """Status of the PODs"""
 
-    PENDING = 'pending'
-    RUNNING = 'running'
-    FAILED = 'failed'
-    SUCCEEDED = 'succeeded'
+    PENDING = 'Pending'
+    RUNNING = 'Running'
+    FAILED = 'Failed'
+    SUCCEEDED = 'Succeeded'
+
+    terminal_states = {FAILED, SUCCEEDED}
+
+
+def container_is_running(event: V1Pod, container_name: str):

Review comment:
       `event` being a `V1Pod` seems odd to me? Should we rename this to `pod`?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -51,10 +55,23 @@ def should_retry_start_pod(exception: Exception) -> bool:
 class PodStatus:
     """Status of the PODs"""
 
-    PENDING = 'pending'
-    RUNNING = 'running'
-    FAILED = 'failed'
-    SUCCEEDED = 'succeeded'
+    PENDING = 'Pending'
+    RUNNING = 'Running'
+    FAILED = 'Failed'
+    SUCCEEDED = 'Succeeded'
+
+    terminal_states = {FAILED, SUCCEEDED}
+
+
+def container_is_running(event: V1Pod, container_name: str):
+    container_statuses = event.status.container_statuses if event and event.status else None
+    if not container_statuses:
+        return False
+    container_status = next(iter([x for x in container_statuses if x.name == container_name]), None)
+    if not container_status:
+        return False
+    else:
+        return container_status.state.running is not None

Review comment:
       ```suggestion
       return container_status.state.running is not None
   ```
   
   nit

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +572,64 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
         if self.do_xcom_push:
             self.log.debug("Adding xcom sidecar to task %s", self.task_id)
             pod = xcom_sidecar.add_xcom_sidecar(pod)
-        return pod
-
-    def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]:
-        """
-        Creates a new pod and monitors for duration of task
 
-        :param labels: labels used to track pod
-        :param launcher: pod launcher that will manage launching and monitoring pods
-        :return:
-        """
-        self.log.debug(
-            "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id
-        )
+        labels = self._create_labels_for_pod(context)
+        self.log.info(f"creating pod with labels {labels} and launcher {self.launcher}")

Review comment:
       ```suggestion
           self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels {label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")
+            self._compare_try_numbers(context, pod)
+
+            # In case of failed pods, should reattach the first time, but only once
+            # as the task will have already failed.
+            if not pod.metadata.labels.get(self.POD_CHECKED_KEY):
+                return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                if not self.is_delete_operator_pod:
+                    self.patch_already_checked(pod)

Review comment:
       Doesn't this mean we can only reattach once? I think we should be patching this in cleanup, not when we reattach.

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +131,102 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> None:
+        """
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        pod = self.run_pod_async(pod)
+        return pod
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
         """
-        Launches the pod synchronously and waits for completion.
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optional[str]]:
+        while True:
+            event = self.read_pod(pod)
+            if event.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. "
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
+        """
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:
+                break
+
+            if last_log_time:
+                delta = pendulum.now() - last_log_time
+                read_logs_since_sec = math.ceil(delta.total_seconds())
+
+            time.sleep(1)
+
+            if not self.container_is_running(pod, container_name=container_name):
+                container_stopped = True  # fetch logs once more and exit
+            else:
+                self.log.info('Container %s has state %s', pod.metadata.name, State.RUNNING)

Review comment:
       I realize this was how it was before, but `State.RUNNING` (as in, Airflows task running state) seems like the wrong thing to use here, no? Maybe this should be `pod.status.phase` instead like `await_pod_completion`?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +131,102 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> None:
+        """
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        pod = self.run_pod_async(pod)
+        return pod

Review comment:
       ```suggestion
           return self.run_pod_async(pod)
   ```
   
   nit

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +340,143 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context):
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        )
+        if len(pod_list.items) > 1:
+            # todo: should we ignore "checked" pods here?
+            raise AirflowException(f'More than one pod running with labels {label_selector}')
+
+        if len(pod_list.items) == 1:
+            pod = pod_list.items[0]
+            self.log.info(f"Found matching pod {pod.metadata.name}")
+            self._compare_try_numbers(context, pod)
+
+            # In case of failed pods, should reattach the first time, but only once
+            # as the task will have already failed.
+            if not pod.metadata.labels.get(self.POD_CHECKED_KEY):
+                return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                if not self.is_delete_operator_pod:
+                    self.patch_already_checked(pod)
+                return pod
+        self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
+        self.launcher.create_pod(pod=pod_request_obj)
+        return pod_request_obj
+
+    def await_pod_start(self, pod):
         try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(
-                    in_cluster=self.in_cluster,
-                    cluster_context=self.cluster_context,
-                    config_file=self.config_file,
-                )
-            else:
-                client = kube_client.get_kube_client(
-                    cluster_context=self.cluster_context, config_file=self.config_file
-                )
-
-            self.client = client
-
-            self.pod = self.create_pod_request_obj()
-            self.namespace = self.pod.metadata.namespace
-
-            # Add combination of labels to uniquely identify a running pod
-            labels = self.create_labels_for_pod(context)
-
-            label_selector = self._get_pod_identifying_label_string(labels)
+            self.launcher.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds)
+        except PodLaunchFailedException:
+            if self.log_events_on_failure:
+                for event in self.launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise
 
-            pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+    def extract_xcom(self, pod):
+        """Retrieves xcom value and kills xcom sidecar container"""
+        result = self.launcher.extract_xcom(pod)
+        self.log.info(result)
+        # todo: add tests re handling of json vs non-json-serializable values
+        return json.loads(result)
 
-            if len(pod_list.items) > 1 and self.reattach_on_restart:
-                raise AirflowException(
-                    f'More than one pod running with labels: {label_selector}'
+    def execute(self, context):
+        base_container_stopped = False
+        remote_pod = None
+        try:
+            self.pod_request_obj = self.build_pod_request_obj(context)
+            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
+                pod_request_obj=self.pod_request_obj,
+                context=context,
+            )
+            self.await_pod_start(pod=self.pod)
+            if self.get_logs:
+                base_container_stopped = self.launcher.follow_container_logs(
+                    pod=self.pod,
+                    container_name=self.BASE_CONTAINER_NAME,
                 )
 
-            launcher = self.create_pod_launcher()
+            # if not getting logs, still need to wait for base container before getting xcom
+            if not base_container_stopped:
+                self.launcher.await_container(pod=self.pod, container_name=self.BASE_CONTAINER_NAME)
 
-            if len(pod_list.items) == 1:
-                try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
-                final_state, remote_pod, result = self.handle_pod_overlap(
-                    labels, try_numbers_match, launcher, pod_list.items[0]
-                )
-            else:
-                self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
-                final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
-            if final_state != State.SUCCESS:
-                raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
-            context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
-            context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
+            if self.do_xcom_push:
+                result = self.extract_xcom(pod=self.pod)
+            remote_pod = self.launcher.await_pod_completion(self.pod)
+        finally:
+            self.cleanup(
+                pod=self.pod or self.pod_request_obj,
+                remote_pod=remote_pod,
+            )
+        if self.do_xcom_push:
+            ti = context['ti']
+            if remote_pod:
+                ti.xcom_push(key='pod_name', value=remote_pod.metadata.name)
+                ti.xcom_push(key='pod_namespace', value=remote_pod.metadata.namespace)
             return result
-        except AirflowException as ex:
-            raise AirflowException(f'Pod Launching failed: {ex}')
 
-    def handle_pod_overlap(
-        self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
-    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
-        """
+    def cleanup(self, pod, remote_pod):
+        with _suppress_with_logging(self, Exception):
+            self.process_pod_deletion(pod)
 
-        In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
-        this function will either continue to monitor the existing pod or launch a new pod
-        based on the `reattach_on_restart` parameter.
-
-        :param labels: labels used to determine if a pod is repeated
-        :type labels: dict
-        :param try_numbers_match: do the try numbers match? Only needed for logging purposes
-        :type try_numbers_match: bool
-        :param launcher: PodLauncher
-        :param pod: Pod found with matching labels
-        """
-        if try_numbers_match:
-            log_line = f"found a running pod with labels {labels} and the same try_number."
-        else:
-            log_line = f"found a running pod with labels {labels} but a different try_number."
-
-        # In case of failed pods, should reattach the first time, but only once
-        # as the task will have already failed.
-        if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
-            log_line += " Will attach to this pod and monitor instead of starting new one"
-            self.log.info(log_line)
-            self.pod = pod
-            final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod)
+        pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
+        if pod_phase != PodStatus.SUCCEEDED:
+            if self.log_events_on_failure:
+                with _suppress_with_logging(self, Exception):
+                    for event in self.launcher.read_pod_events(pod).items:
+                        self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            if not self.is_delete_operator_pod:
+                with _suppress_with_logging(self, Exception):
+                    self.patch_already_checked(pod)
+            raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}')
+
+    def process_pod_deletion(self, pod):
+        if self.is_delete_operator_pod:
+            self.log.info("deleting pod")
+            self.launcher.delete_pod(pod)
         else:
-            log_line += f"creating pod with labels {labels} and launcher {launcher}"
-            self.log.info(log_line)
-            final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
-        return final_state, remote_pod, result
+            self.log.info("skipping pod delete")
 
-    @staticmethod
-    def _get_pod_identifying_label_string(labels) -> str:
+    def _get_pod_identifying_label_string(self, labels) -> str:
         label_strings = [
             f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number'
         ]
-        return ','.join(label_strings) + ',already_checked!=True'
-
-    @staticmethod
-    def _try_numbers_match(context, pod) -> bool:
-        return pod.metadata.labels['try_number'] == context['ti'].try_number
+        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+
+    def _compare_try_numbers(self, context, pod):
+        tries_match = pod.metadata.labels['try_number'] == context['ti'].try_number
+        self.log.info(
+            ' '.join(
+                [
+                    f"found a running pod with labels {pod.metadata.labels}",
+                    "and the same try_number." if tries_match else "but a different try_number.",
+                ]
+            )

Review comment:
       ```suggestion
               (
                   "found a running pod with labels %s"
                   " and the same try_number." if tries_match else "but a different try_number."
               ), pod.metadata.labels
   ```
   Might need a little more formatting, but don't use fstring, and you don't need to do the join/list thing either.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org