You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/16 20:37:19 UTC

[airflow] branch main updated: Fix xcom_sidecar stuck problem (#24993)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f05a06537b Fix xcom_sidecar stuck problem (#24993)
f05a06537b is described below

commit f05a06537be4d12276862eae1960515c76aa11d1
Author: Maksim <ma...@google.com>
AuthorDate: Sat Jul 16 23:37:11 2022 +0300

    Fix xcom_sidecar stuck problem (#24993)
---
 airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py | 8 ++++++--
 airflow/providers/cncf/kubernetes/utils/pod_manager.py        | 5 ++++-
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index a26e2f1c59..90d4fb88de 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -402,8 +402,12 @@ class KubernetesPodOperator(BaseOperator):
     def extract_xcom(self, pod: k8s.V1Pod):
         """Retrieves xcom value and kills xcom sidecar container"""
         result = self.pod_manager.extract_xcom(pod)
-        self.log.info("xcom result: \n%s", result)
-        return json.loads(result)
+        if isinstance(result, str) and result.rstrip() == '__airflow_xcom_result_empty__':
+            self.log.info("Result file is empty.")
+            return None
+        else:
+            self.log.info("xcom result: \n%s", result)
+            return json.loads(result)
 
     def execute(self, context: 'Context'):
         remote_pod = None
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 4a1f0539ee..52b7113a59 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -370,7 +370,10 @@ class PodManager(LoggingMixin):
                 _preload_content=False,
             )
         ) as resp:
-            result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json')
+            result = self._exec_pod_command(
+                resp,
+                f'if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo __airflow_xcom_result_empty__; fi',  # noqa
+            )
             self._exec_pod_command(resp, 'kill -s SIGINT 1')
         if result is None:
             raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}')