You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/22 22:58:14 UTC

[GitHub] [airflow] dstandish opened a new pull request, #28546: Use labels instead of pod name for pod log read in k8s exec

dstandish opened a new pull request, #28546:
URL: https://github.com/apache/airflow/pull/28546

   This means we don't have to use ti.hostname as a proxy for pod name, and allows us to lift the 63 charcter limit, which was a consequence of getting pod name through hostname.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055921963


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)

Review Comment:
   it is already in a try / except so that catches and does just that



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)

Review Comment:
   so i could duplicate that code or just raise :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055936849


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   I should have air quoted, "public". Where I'm going is that it feels weird to me having `build` even work without all of the labels we'd set on the pod being required, moving the "what's really required" outside of PodGenerator. I'm coming at it from the PodGenerator "public internal" interface perspective.
   
   e.g.
   ```
   def _get_labels_dict(
       *,
       dag_id,
       task_id,
       try_number,
       airflow_worker=None,
       map_index=None,
       date=None,
       run_id=None,
   ):
       ...
   
   def build(
       *,
       dag_id,
       task_id,
       try_number,
       airflow_worker,
       map_index,
       date,
       run_id,
   ):
       return _get_labels_dict(...)
   
   def selector(*, dag_id, task_id, try_number):
      return _get_labels_dict(...)
   ```
   
   This seems to more easily mesh with what you should to do for build vs get the selector (e.g. code completion/typing - ":meta private:" is for sphinx after all).
   
   However, enough bikeshedding on my part :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1059515390


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   Just different design decisions ultimately if the caller must know what's require or the callable is able to enforce what's required. Not a big deal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055923312


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,

Review Comment:
   well their both optional -- you could use either or both or none...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish merged pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish merged PR #28546:
URL: https://github.com/apache/airflow/pull/28546


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055921589


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   I wonder if we should have `build_selector_for_k8s_executor_pod` built it's own dict so we don't have to do all these conditional checks - it took me a second to figure out why it was this way. Plus it'll ensure we set everything we actually want to on the pod (e.g. airflow_version).



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(

Review Comment:
   Is it easy to get scheduler_job_id here? That'll be a little more resilient to more than one instance in a single namespace.



##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,

Review Comment:
   Do we need both execution_date and run_id on the selector?



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)
+                res = client.read_namespaced_pod_log(
+                    name=pod_list[0].metadata.name,

Review Comment:
   If we have more than 1, we should probably log/bail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055927346


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   we do basically same thing for KPO...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055924742


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(

Review Comment:
   there is ti.queued_by_job_id... but... will that agree?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055926224


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055923312


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,

Review Comment:
   well they're both optional -- you could use either or both or none...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055966660


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   I don't mind the discussion
   
   I *think* I understand where you're coming from
   
   The way I see it, in this case, the caller must know what it needs and it's not the labels function to decide. It just builds requested labels in the right way.  If we make the function decide what is required, that's sort of a "logic in two places" kind of thing. And so, that's why I'm able to use the one labels func in two ways. It's not worried about the requirements -- that's the callers job, and different callers have different needs -- one is for pod creation and the other for pod finding.
   
   Re private, I think it's @uranusjr 's position that meta private means not part of the public API, not just in terms of doc but also backcompat, and so that's what I went with here.  And it has the pleasant side effect of no warning about private method accessed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055966660


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   I don't mind the discussion
   
   I *think* I understand where you're coming from
   
   The way I see it, in this case, the caller must know what it needs and it's not the labels function to decide. It just builds requested labels in the right way.  If we make the function decide what is required, that's sort of a "logic in two places" kind of thing. And so, that's why I'm able to use the one labels func in two ways. It's not worried about the requirements -- that's the callers job, and different callers have different needs -- one is for pod creation and the other for pod finding.
   
   Re private, I think it's @uranusjr 's position that meta private means not part of the public API, not just in terms of doc but also backcompat, and so that's what I went with here.  And it has the pleasant side effect of no warning about private method accessed when using it in FTH as i do here.  And, really, the fact that it's private is the most important thing for me, because then the decisions about the interface, you know we can change them whenever so within reason it doesn't really matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055926509


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   Right, good point. Let's pull the heavy lifting with these optional args into a private function then, and have the public `build_labels_for_k8s_executor_pod` require them? That should cover everything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055927732


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,

Review Comment:
   so, it's probably backcompat legacy for construct_pod... because everything is positional args they are all "required" 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055925414


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(

Review Comment:
   Yes, it should, plus it gets patched when the pod is adopted by a new scheduler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055920303


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)

Review Comment:
   Instead of raising an error (which I think will result in a 500!) should we do something like we do on L191-192:
   ```
                   log += f"*** {str(e)}\n"
                   return log, {"end_of_log": True}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055923961


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   confused a bit...
   but...
   the reason i want to reuse a shared labels func is... the way we generate selector must agree with the way we generate the labels -- if there is inconsistency then we may not find the pod. only way to guarantee consistent is to use same func



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1056727004


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   But, i should add... honestly don't feel strongly.  And particularly since the intention is to make these methods non-public (i.e. internal, no backcompat, therefore no friction to change in future) then happy to code it up however.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055926291


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                    dag_id=ti.dag_id,
+                    task_id=ti.task_id,
+                    try_number=ti.try_number,
+                    map_index=ti.map_index,
+                    run_id=ti.run_id,
+                )
+                namespace = self._get_pod_namespace(ti)
+                pod_list = client.list_namespaced_pod(
+                    namespace=namespace,
+                    label_selector=selector,
+                ).items
+                if not pod_list:
+                    raise RuntimeError("Cannot find pod for ti %s", ti)
+                res = client.read_namespaced_pod_log(
+                    name=pod_list[0].metadata.name,

Review Comment:
   doen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055927080


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,
+            run_id=run_id,
+        )
+        label_strings = [f"{label_id}={label}" for label_id, label in sorted(labels.items())]
+        selector = ",".join(label_strings)
+        selector += ",airflow-worker"
+        return selector
+
+    @classmethod
+    def build_labels_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        airflow_worker=None,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate labels for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = {
+            "dag_id": make_safe_label_value(dag_id),
+            "task_id": make_safe_label_value(task_id),
+            "try_number": str(try_number),
+            "kubernetes_executor": "True",
+        }
+        if airflow_worker is not None:

Review Comment:
   i have both of these methods set to private...  (TP style, `:meta private:`) and .... `build_labels_for_k8s_executor_pod` _is_ the "heavy lifting" one IMO... clarify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055925550


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -191,19 +191,29 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
         elif self._should_check_k8s(ti.queue):
-            pod_override = ti.executor_config.get("pod_override")
-            if pod_override and pod_override.metadata and pod_override.metadata.namespace:
-                namespace = pod_override.metadata.namespace
-            else:
-                namespace = conf.get("kubernetes_executor", "namespace")
             try:
                 from airflow.kubernetes.kube_client import get_kube_client
+                from airflow.kubernetes.pod_generator import PodGenerator
 
-                kube_client = get_kube_client()
+                client = get_kube_client()
 
                 log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
+                selector = PodGenerator.build_selector_for_k8s_executor_pod(

Review Comment:
   ok 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #28546: Use labels instead of pod name for pod log read in k8s exec

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #28546:
URL: https://github.com/apache/airflow/pull/28546#discussion_r1055927271


##########
airflow/kubernetes/pod_generator.py:
##########
@@ -430,6 +419,70 @@ def construct_pod(
         except Exception as e:
             raise PodReconciliationError from e
 
+    @classmethod
+    def build_selector_for_k8s_executor_pod(
+        cls,
+        *,
+        dag_id,
+        task_id,
+        try_number,
+        map_index=None,
+        date=None,
+        run_id=None,
+    ):
+        """
+        Generate selector for kubernetes executor pod
+
+        :meta private:
+        """
+        labels = cls.build_labels_for_k8s_executor_pod(
+            dag_id=dag_id,
+            task_id=task_id,
+            try_number=try_number,
+            map_index=map_index,
+            date=date,

Review Comment:
   That surprises me a bit, run_id is in the TI pk. That might have wider impact though...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org