You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/22 23:16:40 UTC

[GitHub] [airflow] jedcunningham commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055921589


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   I wonder if we should have `build_selector_for_k8s_executor_pod` built it's own dict so we don't have to do all these conditional checks - it took me a second to figure out why it was this way. Plus it'll ensure we set everything we actually want to on the pod (e.g. airflow_version).



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(

Review Comment:
   Is it easy to get scheduler_job_id here? That'll be a little more resilient to more than one instance in a single namespace.



##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,

Review Comment:
   Do we need both execution_date and run_id on the selector?



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)
+                res = client.read_namespaced_pod_log(
+                    name=pod_list[0].metadata.name,

Review Comment:
   If we have more than 1, we should probably log/bail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org