You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/18 20:29:18 UTC

[GitHub] [airflow] BasPH commented on a change in pull request #19572: Simplify ``KubernetesPodOperator``

BasPH commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r771860213



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +339,133 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        ).items
+
+        num_pods = len(pod_list)
+        if num_pods > 1:
+            raise AirflowException(f'More than one pod running with labels {label_selector}')
+        elif num_pods == 1:
+            pod = pod_list[0]
+            self.log.info("Found matching pod %s", pod.metadata.name)
+            self._compare_try_numbers(context, pod)
+            return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                return pod
+        self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
+        self.launcher.create_pod(pod=pod_request_obj)
+        return pod_request_obj
+
+    def await_pod_start(self, pod):
         try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(
-                    in_cluster=self.in_cluster,
-                    cluster_context=self.cluster_context,
-                    config_file=self.config_file,
-                )
-            else:
-                client = kube_client.get_kube_client(
-                    cluster_context=self.cluster_context, config_file=self.config_file
-                )
-
-            self.client = client
-
-            self.pod = self.create_pod_request_obj()
-            self.namespace = self.pod.metadata.namespace
-
-            # Add combination of labels to uniquely identify a running pod
-            labels = self.create_labels_for_pod(context)
-
-            label_selector = self._get_pod_identifying_label_string(labels)
-
-            pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+            self.launcher.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds)
+        except PodLaunchFailedException:
+            if self.log_events_on_failure:
+                for event in self.launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise
 
-            if len(pod_list.items) > 1 and self.reattach_on_restart:
-                raise AirflowException(
-                    f'More than one pod running with labels: {label_selector}'
-                )
+    def extract_xcom(self, pod):
+        """Retrieves xcom value and kills xcom sidecar container"""
+        result = self.launcher.extract_xcom(pod)
+        self.log.info(result)
+        return json.loads(result)
 
-            launcher = self.create_pod_launcher()
+    def execute(self, context):
+        remote_pod = None
+        try:
+            self.pod_request_obj = self.build_pod_request_obj(context)
+            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
+                pod_request_obj=self.pod_request_obj,
+                context=context,
+            )
+            self.await_pod_start(pod=self.pod)
 
-            if len(pod_list.items) == 1:
-                try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
-                final_state, remote_pod, result = self.handle_pod_overlap(
-                    labels, try_numbers_match, launcher, pod_list.items[0]
+            if self.get_logs:
+                self.launcher.follow_container_logs(
+                    pod=self.pod,
+                    container_name=self.BASE_CONTAINER_NAME,
                 )
             else:
-                self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
-                final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
-            if final_state != State.SUCCESS:
-                raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
-            context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
-            context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
-            return result
-        except AirflowException as ex:
-            raise AirflowException(f'Pod Launching failed: {ex}')
+                self.launcher.await_container_completion(
+                    pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+                )
 
-    def handle_pod_overlap(
-        self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
-    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
-        """
+            if self.do_xcom_push:
+                result = self.extract_xcom(pod=self.pod)
+            remote_pod = self.launcher.await_pod_completion(self.pod)
+        finally:
+            self.cleanup(
+                pod=self.pod or self.pod_request_obj,
+                remote_pod=remote_pod,
+            )
+        if self.do_xcom_push:
+            ti = context['ti']
+            if remote_pod:
+                ti.xcom_push(key='pod_name', value=remote_pod.metadata.name)
+                ti.xcom_push(key='pod_namespace', value=remote_pod.metadata.namespace)
+            return result
 
-        In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
-        this function will either continue to monitor the existing pod or launch a new pod
-        based on the `reattach_on_restart` parameter.
+    def cleanup(self, pod, remote_pod):

Review comment:
       Are these both V1Pod types?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +129,99 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> V1Pod:
+        """Launches the pod asynchronously."""
+        return self.run_pod_async(pod)
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
         """
-        Launches the pod synchronously and waits for completion.
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optional[str]]:
+        while True:
+            remote_pod = self.read_pod(pod)
+            if remote_pod.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. "
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
+        """
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+
+        # `read_pod_logs` follows the logs so we shouldn't necessarily _need_ to loop
+        # but in a long-running process we might lose connectivity and this way we
+        # can resume following the logs
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:

Review comment:
       ```suggestion
               if container_stopped:
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +339,133 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        ).items
+
+        num_pods = len(pod_list)
+        if num_pods > 1:
+            raise AirflowException(f'More than one pod running with labels {label_selector}')
+        elif num_pods == 1:
+            pod = pod_list[0]
+            self.log.info("Found matching pod %s", pod.metadata.name)
+            self._compare_try_numbers(context, pod)
+            return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                return pod
+        self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
+        self.launcher.create_pod(pod=pod_request_obj)
+        return pod_request_obj
+
+    def await_pod_start(self, pod):
         try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(
-                    in_cluster=self.in_cluster,
-                    cluster_context=self.cluster_context,
-                    config_file=self.config_file,
-                )
-            else:
-                client = kube_client.get_kube_client(
-                    cluster_context=self.cluster_context, config_file=self.config_file
-                )
-
-            self.client = client
-
-            self.pod = self.create_pod_request_obj()
-            self.namespace = self.pod.metadata.namespace
-
-            # Add combination of labels to uniquely identify a running pod
-            labels = self.create_labels_for_pod(context)
-
-            label_selector = self._get_pod_identifying_label_string(labels)
-
-            pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+            self.launcher.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds)
+        except PodLaunchFailedException:
+            if self.log_events_on_failure:
+                for event in self.launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise
 
-            if len(pod_list.items) > 1 and self.reattach_on_restart:
-                raise AirflowException(
-                    f'More than one pod running with labels: {label_selector}'
-                )
+    def extract_xcom(self, pod):
+        """Retrieves xcom value and kills xcom sidecar container"""
+        result = self.launcher.extract_xcom(pod)
+        self.log.info(result)
+        return json.loads(result)
 
-            launcher = self.create_pod_launcher()
+    def execute(self, context):
+        remote_pod = None
+        try:
+            self.pod_request_obj = self.build_pod_request_obj(context)
+            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
+                pod_request_obj=self.pod_request_obj,
+                context=context,
+            )
+            self.await_pod_start(pod=self.pod)
 
-            if len(pod_list.items) == 1:
-                try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
-                final_state, remote_pod, result = self.handle_pod_overlap(
-                    labels, try_numbers_match, launcher, pod_list.items[0]
+            if self.get_logs:
+                self.launcher.follow_container_logs(
+                    pod=self.pod,
+                    container_name=self.BASE_CONTAINER_NAME,
                 )
             else:
-                self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
-                final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
-            if final_state != State.SUCCESS:
-                raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
-            context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
-            context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
-            return result
-        except AirflowException as ex:
-            raise AirflowException(f'Pod Launching failed: {ex}')
+                self.launcher.await_container_completion(
+                    pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+                )
 
-    def handle_pod_overlap(
-        self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
-    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
-        """
+            if self.do_xcom_push:
+                result = self.extract_xcom(pod=self.pod)
+            remote_pod = self.launcher.await_pod_completion(self.pod)
+        finally:
+            self.cleanup(
+                pod=self.pod or self.pod_request_obj,
+                remote_pod=remote_pod,
+            )
+        if self.do_xcom_push:
+            ti = context['ti']
+            if remote_pod:
+                ti.xcom_push(key='pod_name', value=remote_pod.metadata.name)
+                ti.xcom_push(key='pod_namespace', value=remote_pod.metadata.namespace)
+            return result
 
-        In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
-        this function will either continue to monitor the existing pod or launch a new pod
-        based on the `reattach_on_restart` parameter.
+    def cleanup(self, pod, remote_pod):
+        with _suppress(Exception):
+            self.process_pod_deletion(pod)
 
-        :param labels: labels used to determine if a pod is repeated
-        :type labels: dict
-        :param try_numbers_match: do the try numbers match? Only needed for logging purposes
-        :type try_numbers_match: bool
-        :param launcher: PodLauncher
-        :param pod: Pod found with matching labels
-        """
-        if try_numbers_match:
-            log_line = f"found a running pod with labels {labels} and the same try_number."
-        else:
-            log_line = f"found a running pod with labels {labels} but a different try_number."
-
-        # In case of failed pods, should reattach the first time, but only once
-        # as the task will have already failed.
-        if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
-            log_line += " Will attach to this pod and monitor instead of starting new one"
-            self.log.info(log_line)
-            self.pod = pod
-            final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod)
+        pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
+        if pod_phase != PodStatus.SUCCEEDED:
+            if self.log_events_on_failure:
+                with _suppress(Exception):
+                    for event in self.launcher.read_pod_events(pod).items:
+                        self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            if not self.is_delete_operator_pod:
+                with _suppress(Exception):
+                    self.patch_already_checked(pod)
+            raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}')
+
+    def process_pod_deletion(self, pod):
+        if self.is_delete_operator_pod:
+            self.log.info("deleting pod: %s", pod.metadata.name)

Review comment:
       ```suggestion
               self.log.info("Deleting pod: %s", pod.metadata.name)
   ```

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +129,99 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> V1Pod:
+        """Launches the pod asynchronously."""
+        return self.run_pod_async(pod)
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
         """
-        Launches the pod synchronously and waits for completion.
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optional[str]]:
+        while True:
+            remote_pod = self.read_pod(pod)
+            if remote_pod.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. "
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
+        """
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+
+        # `read_pod_logs` follows the logs so we shouldn't necessarily _need_ to loop
+        # but in a long-running process we might lose connectivity and this way we
+        # can resume following the logs
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:
+                break
+
+            if last_log_time:
+                delta = pendulum.now() - last_log_time
+                read_logs_since_sec = math.ceil(delta.total_seconds())
+
+            time.sleep(1)
+
+            if self.container_is_running(pod, container_name=container_name):
+                self.log.info('Container %s is running', pod.metadata.name)
+                self.log.warning('Pod %s log read interrupted', pod.metadata.name)

Review comment:
       I assume the "interrupted" refers to the `sleep(1)`? Is that necessary to log (it feels like an internal detail not useful to the user), or could you reword the message for clarity?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -286,29 +298,29 @@ def read_pod(self, pod: V1Pod) -> V1Pod:
         except BaseHTTPError as e:
             raise AirflowException(f'There was an error reading the kubernetes API: {e}')
 
-    def _extract_xcom(self, pod: V1Pod) -> str:
-        resp = kubernetes_stream(
-            self._client.connect_get_namespaced_pod_exec,
-            pod.metadata.name,
-            pod.metadata.namespace,
-            container=PodDefaults.SIDECAR_CONTAINER_NAME,
-            command=['/bin/sh'],
-            stdin=True,
-            stdout=True,
-            stderr=True,
-            tty=False,
-            _preload_content=False,
-        )
-        try:
+    def extract_xcom(self, pod: V1Pod) -> str:
+        """Retrieves xcom value using xcom value and kills xcom sidecar container"""

Review comment:
       ```suggestion
           """Retrieves XCom value and kills xcom sidecar container"""
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -328,101 +339,133 @@ def create_labels_for_pod(context) -> dict:
             labels[label_id] = safe_label
         return labels
 
-    def create_pod_launcher(self) -> Type[pod_launcher.PodLauncher]:
-        return pod_launcher.PodLauncher(kube_client=self.client, extract_xcom=self.do_xcom_push)
+    @cached_property
+    def launcher(self) -> pod_launcher.PodLauncher:
+        return pod_launcher.PodLauncher(kube_client=self.client)
 
-    def execute(self, context) -> Optional[str]:
+    @cached_property
+    def client(self) -> CoreV1Api:
+        # todo: use airflow Connection / hook to authenticate to the cluster
+        kwargs: Dict[str, Any] = dict(
+            cluster_context=self.cluster_context,
+            config_file=self.config_file,
+        )
+        if self.in_cluster is not None:
+            kwargs.update(in_cluster=self.in_cluster)
+        return kube_client.get_kube_client(**kwargs)
+
+    def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
+        """Returns an already-running pod for this task instance if one exists."""
+        labels = self._create_labels_for_pod(context)
+        label_selector = self._get_pod_identifying_label_string(labels)
+        pod_list = self.client.list_namespaced_pod(
+            namespace=namespace,
+            label_selector=label_selector,
+        ).items
+
+        num_pods = len(pod_list)
+        if num_pods > 1:
+            raise AirflowException(f'More than one pod running with labels {label_selector}')
+        elif num_pods == 1:
+            pod = pod_list[0]
+            self.log.info("Found matching pod %s", pod.metadata.name)
+            self._compare_try_numbers(context, pod)
+            return pod
+
+    def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context):
+        if self.reattach_on_restart:
+            pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
+            if pod:
+                return pod
+        self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
+        self.launcher.create_pod(pod=pod_request_obj)
+        return pod_request_obj
+
+    def await_pod_start(self, pod):
         try:
-            if self.in_cluster is not None:
-                client = kube_client.get_kube_client(
-                    in_cluster=self.in_cluster,
-                    cluster_context=self.cluster_context,
-                    config_file=self.config_file,
-                )
-            else:
-                client = kube_client.get_kube_client(
-                    cluster_context=self.cluster_context, config_file=self.config_file
-                )
-
-            self.client = client
-
-            self.pod = self.create_pod_request_obj()
-            self.namespace = self.pod.metadata.namespace
-
-            # Add combination of labels to uniquely identify a running pod
-            labels = self.create_labels_for_pod(context)
-
-            label_selector = self._get_pod_identifying_label_string(labels)
-
-            pod_list = self.client.list_namespaced_pod(self.namespace, label_selector=label_selector)
+            self.launcher.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds)
+        except PodLaunchFailedException:
+            if self.log_events_on_failure:
+                for event in self.launcher.read_pod_events(pod).items:
+                    self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            raise
 
-            if len(pod_list.items) > 1 and self.reattach_on_restart:
-                raise AirflowException(
-                    f'More than one pod running with labels: {label_selector}'
-                )
+    def extract_xcom(self, pod):
+        """Retrieves xcom value and kills xcom sidecar container"""
+        result = self.launcher.extract_xcom(pod)
+        self.log.info("xcom result: \n%s", result)
+        return json.loads(result)
 
-            launcher = self.create_pod_launcher()
+    def execute(self, context):
+        remote_pod = None
+        try:
+            self.pod_request_obj = self.build_pod_request_obj(context)
+            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
+                pod_request_obj=self.pod_request_obj,
+                context=context,
+            )
+            self.await_pod_start(pod=self.pod)
 
-            if len(pod_list.items) == 1:
-                try_numbers_match = self._try_numbers_match(context, pod_list.items[0])
-                final_state, remote_pod, result = self.handle_pod_overlap(
-                    labels, try_numbers_match, launcher, pod_list.items[0]
+            if self.get_logs:
+                self.launcher.follow_container_logs(
+                    pod=self.pod,
+                    container_name=self.BASE_CONTAINER_NAME,
                 )
             else:
-                self.log.info("creating pod with labels %s and launcher %s", labels, launcher)
-                final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
-            if final_state != State.SUCCESS:
-                raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}')
-            context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name)
-            context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace)
-            return result
-        except AirflowException as ex:
-            raise AirflowException(f'Pod Launching failed: {ex}')
+                self.launcher.await_container_completion(
+                    pod=self.pod, container_name=self.BASE_CONTAINER_NAME
+                )
 
-    def handle_pod_overlap(
-        self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod
-    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
-        """
+            if self.do_xcom_push:
+                result = self.extract_xcom(pod=self.pod)
+            remote_pod = self.launcher.await_pod_completion(self.pod)
+        finally:
+            self.cleanup(
+                pod=self.pod or self.pod_request_obj,
+                remote_pod=remote_pod,
+            )
+        if self.do_xcom_push:
+            ti = context['ti']
+            if remote_pod:
+                ti.xcom_push(key='pod_name', value=remote_pod.metadata.name)
+                ti.xcom_push(key='pod_namespace', value=remote_pod.metadata.namespace)
+            return result
 
-        In cases where the Scheduler restarts while a KubernetesPodOperator task is running,
-        this function will either continue to monitor the existing pod or launch a new pod
-        based on the `reattach_on_restart` parameter.
+    def cleanup(self, pod, remote_pod):
+        with _suppress(Exception):
+            self.process_pod_deletion(pod)
 
-        :param labels: labels used to determine if a pod is repeated
-        :type labels: dict
-        :param try_numbers_match: do the try numbers match? Only needed for logging purposes
-        :type try_numbers_match: bool
-        :param launcher: PodLauncher
-        :param pod: Pod found with matching labels
-        """
-        if try_numbers_match:
-            log_line = f"found a running pod with labels {labels} and the same try_number."
-        else:
-            log_line = f"found a running pod with labels {labels} but a different try_number."
-
-        # In case of failed pods, should reattach the first time, but only once
-        # as the task will have already failed.
-        if self.reattach_on_restart and not pod.metadata.labels.get("already_checked"):
-            log_line += " Will attach to this pod and monitor instead of starting new one"
-            self.log.info(log_line)
-            self.pod = pod
-            final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod)
+        pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
+        if pod_phase != PodStatus.SUCCEEDED:
+            if self.log_events_on_failure:
+                with _suppress(Exception):
+                    for event in self.launcher.read_pod_events(pod).items:
+                        self.log.error("Pod Event: %s - %s", event.reason, event.message)
+            if not self.is_delete_operator_pod:
+                with _suppress(Exception):
+                    self.patch_already_checked(pod)
+            raise AirflowException(f'Pod {pod and pod.metadata.name} returned a failure: {remote_pod}')
+
+    def process_pod_deletion(self, pod):
+        if self.is_delete_operator_pod:
+            self.log.info("deleting pod: %s", pod.metadata.name)
+            self.launcher.delete_pod(pod)
         else:
-            log_line += f"creating pod with labels {labels} and launcher {launcher}"
-            self.log.info(log_line)
-            final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher)
-        return final_state, remote_pod, result
+            self.log.info("skipping deleting pod: %s", pod.metadata.name)
 
-    @staticmethod
-    def _get_pod_identifying_label_string(labels) -> str:
+    def _get_pod_identifying_label_string(self, labels) -> str:
         label_strings = [
             f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number'
         ]
-        return ','.join(label_strings) + ',already_checked!=True'
-
-    @staticmethod
-    def _try_numbers_match(context, pod) -> bool:
-        return pod.metadata.labels['try_number'] == context['ti'].try_number
+        return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True'
+
+    def _compare_try_numbers(self, context, pod):
+        tries_match = pod.metadata.labels['try_number'] == context['ti'].try_number
+        self.log.info(
+            "found a running pod with labels %s %s try_number.",
+            pod.metadata.labels,
+            "and the same" if tries_match else "but a different",
+        )

Review comment:
       This method name and the argument names read unnaturally to me. How about only passing in the try numbers instead of the complete objects, so that it's clear from just the method name what's expected:
   
   ```suggestion
       def _compare_try_numbers(self, context_try_number: int, pod_try_number: int):
           tries_match = context_try_number == pod_try_number
           self.log.info(
               "Found a running pod with labels %s %s try_number.",
               pod.metadata.labels,
               "and the same" if tries_match else "but a different",
           )
   ```
   
   And calling it as:
   ```python
   self._compare_try_numbers(context['ti'].try_number, pod.metadata.labels['try_number'])
   ```

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -51,10 +54,22 @@ def should_retry_start_pod(exception: Exception) -> bool:
 class PodStatus:
     """Status of the PODs"""
 
-    PENDING = 'pending'
-    RUNNING = 'running'
-    FAILED = 'failed'
-    SUCCEEDED = 'succeeded'
+    PENDING = 'Pending'
+    RUNNING = 'Running'
+    FAILED = 'Failed'
+    SUCCEEDED = 'Succeeded'
+
+    terminal_states = {FAILED, SUCCEEDED}
+
+
+def container_is_running(pod: V1Pod, container_name: str) -> bool:

Review comment:
       Could you add a docstring here?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -212,35 +244,16 @@ def parse_log_line(self, line: str) -> Tuple[Optional[Union[Date, Time, DateTime
             return None, line
         return last_log_time, message
 
-    def _task_status(self, event: V1Event) -> str:
-        self.log.info('Event: %s had an event of type %s', event.metadata.name, event.status.phase)
-        status = self.process_status(event.metadata.name, event.status.phase)
-        return status
-
-    def pod_not_started(self, pod: V1Pod) -> bool:
-        """Tests if pod has not started"""
-        state = self._task_status(self.read_pod(pod))
-        return state == State.QUEUED
-
-    def pod_is_running(self, pod: V1Pod) -> bool:
-        """Tests if pod is running"""
-        state = self._task_status(self.read_pod(pod))
-        return state not in (State.SUCCESS, State.FAILED)
-
-    def base_container_is_running(self, pod: V1Pod) -> bool:
-        """Tests if base container is running"""
-        event = self.read_pod(pod)
-        if not (event and event.status and event.status.container_statuses):
-            return False
-        status = next(iter(filter(lambda s: s.name == 'base', event.status.container_statuses)), None)
-        if not status:
-            return False
-        return status.state.running is not None
+    def container_is_running(self, pod: V1Pod, container_name) -> bool:

Review comment:
       ```suggestion
       def container_is_running(self, pod: V1Pod, container_name: str) -> bool:
   ```

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -501,83 +561,40 @@ def create_pod_request_obj(self) -> k8s.V1Pod:
         if self.do_xcom_push:
             self.log.debug("Adding xcom sidecar to task %s", self.task_id)
             pod = xcom_sidecar.add_xcom_sidecar(pod)
-        return pod
 
-    def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Pod, Optional[str]]:
-        """
-        Creates a new pod and monitors for duration of task
-
-        :param labels: labels used to track pod
-        :param launcher: pod launcher that will manage launching and monitoring pods
-        :return:
-        """
-        self.log.debug(
-            "Adding KubernetesPodOperator labels to pod before launch for task %s", self.task_id
-        )
+        labels = self._create_labels_for_pod(context)
+        self.log.info("creating pod %s with labels: %s", pod.metadata.name, labels)

Review comment:
       ```suggestion
           self.log.info("Creating pod %s with labels: %s", pod.metadata.name, labels)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org