You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/07/16 20:37:19 UTC
[airflow] branch main updated: Fix xcom_sidecar stuck problem (#24993)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f05a06537b Fix xcom_sidecar stuck problem (#24993)
f05a06537b is described below
commit f05a06537be4d12276862eae1960515c76aa11d1
Author: Maksim <ma...@google.com>
AuthorDate: Sat Jul 16 23:37:11 2022 +0300
Fix xcom_sidecar stuck problem (#24993)
---
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py | 8 ++++++--
airflow/providers/cncf/kubernetes/utils/pod_manager.py | 5 ++++-
2 files changed, 10 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index a26e2f1c59..90d4fb88de 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -402,8 +402,12 @@ class KubernetesPodOperator(BaseOperator):
def extract_xcom(self, pod: k8s.V1Pod):
"""Retrieves xcom value and kills xcom sidecar container"""
result = self.pod_manager.extract_xcom(pod)
- self.log.info("xcom result: \n%s", result)
- return json.loads(result)
+ if isinstance(result, str) and result.rstrip() == '__airflow_xcom_result_empty__':
+ self.log.info("Result file is empty.")
+ return None
+ else:
+ self.log.info("xcom result: \n%s", result)
+ return json.loads(result)
def execute(self, context: 'Context'):
remote_pod = None
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 4a1f0539ee..52b7113a59 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -370,7 +370,10 @@ class PodManager(LoggingMixin):
_preload_content=False,
)
) as resp:
- result = self._exec_pod_command(resp, f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json')
+ result = self._exec_pod_command(
+ resp,
+ f'if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo __airflow_xcom_result_empty__; fi', # noqa
+ )
self._exec_pod_command(resp, 'kill -s SIGINT 1')
if result is None:
raise AirflowException(f'Failed to extract xcom from pod: {pod.metadata.name}')