You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/09 17:24:09 UTC

[GitHub] [airflow] snjypl opened a new pull request, #28161: WIP AIP-51 - Executor Coupling in Logging

snjypl opened a new pull request, #28161:
URL: https://github.com/apache/airflow/pull/28161

   Fixes: https://github.com/apache/airflow/issues/27931
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1065603602


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -749,6 +751,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
                 self.log.debug("Could not find key: %s", str(key))
         self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):

Review Comment:
   It's just a simpler way to do this https://github.com/apache/airflow/pull/28546/files#diff-e7f34f73940eb52d92bb991abedc1c963431c5373c12dff739c8fb7d03e93d3aL195



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1049219335


##########
airflow/executors/base_executor.py:
##########
@@ -304,6 +304,9 @@ def execute_async(
         """
         raise NotImplementedError()
 
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:

Review Comment:
   Just noticed that this is missing a doc string. Do you mind adding one that explains what this method does and that expectation is child classes implement it (if desired) and also documenting the params/return values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl closed pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl closed pull request #28161: WIP AIP-51 - Executor Coupling in Logging
URL: https://github.com/apache/airflow/pull/28161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1065010894


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -749,6 +751,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
                 self.log.debug("Could not find key: %s", str(key))
         self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):

Review Comment:
   I see, fair enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1064578797


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -749,6 +751,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
                 self.log.debug("Could not find key: %s", str(key))
         self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):

Review Comment:
   @o-nikolas  those changes were part of https://github.com/apache/airflow/pull/28546 . i simply moved it from `file_task_handler` to `kubernetes_executor` while resolving merge conflicts. 
   
   @dstandish  might be able to help with it. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on PR #28161:
URL: https://github.com/apache/airflow/pull/28161#issuecomment-1367534800

   > Fantastic unit tests! Thanks for adding those.
   > 
   > Just left one other small request for a doc string.
   > 
   > Also, are you able to do any UAT/manual testing to ensure things are working as expected as compared to before this change?
   
   @o-nikolas  i did some manual testing with  different executors. it is working as expected. 
   
   in main there is an issue with  `LocalKubernetesExecutor`. scheduler is not serving the logs for local task. https://github.com/apache/airflow/pull/28638 . i tested with a patched version, and it is working fine. 
   
   
   
   `celery executor` 
   ```
   airflow-worker-0.airflow-worker.airflow.svc.cluster.local
   *** Local log file does not exist: /opt/airflow/logs/dag_id=example_bash_operator/run_id=manual__2022-12-29T19:06:34.783246+00:00/task_id=runme_0/attempt=1.log
   *** Failed to fetch log from executor. Falling back to fetching log from worker.
   *** Fetching from: http://airflow-worker-0.airflow-worker.airflow.svc.cluster.local:8793/log/dag_id=example_bash_operator/run_id=manual__2022-12-29T19:06:34.783246+00:00/task_id=runme_0/attempt=1.log
   
   [2022-12-29, 19:06:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_0 manual__2022-12-29T19:06:34.783246+00:00 [queued]>
   [2022-12-29, 19:06:37 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: example_bash_operator.runme_0 manual__2022-12-29T19:06:34.783246+00:00 [queued]>
   [2022-12-29, 19:06:37 UTC] {taskinstance.py:1282} INFO - 
   ```
   
   `LocalKubernetesExecutor ` local task
   ```
   airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local
   *** Local log file does not exist: /opt/airflow/logs/dag_id=example_local_kubernetes_executor/run_id=manual__2022-12-29T19:11:14.004608+00:00/task_id=task_with_local_executor/attempt=1.log
   *** Failed to fetch log from executor. Falling back to fetching log from worker.
   *** Fetching from: http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=example_local_kubernetes_executor/run_id=manual__2022-12-29T19:11:14.004608+00:00/task_id=task_with_local_executor/attempt=1.log
   
   [2022-12-29, 19:11:15 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: example_local_kubernetes_executor.task_with_local_executor manual__2022-12-29T19:11:14.004608+00:00 [queued]>
   ```
   `LocalKubernetesExecutor ` k8s task
   
   ```
   example-local-kubernetes-executor-task-with-kubernetes-q2qf6dpa
   *** Local log file does not exist: /opt/airflow/logs/dag_id=example_local_kubernetes_executor/run_id=manual__2022-12-29T19:11:14.004608+00:00/task_id=task_with_kubernetes_executor/attempt=1.log
   *** Trying to get logs (last 100 lines) from worker pod example-local-kubernetes-executor-task-with-kubernetes-q2qf6dpa ***
   
   
   /home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py:925 DeprecationWarning: The namespace option in [kubernetes] has been moved to the namespace option in [kubernetes_executor] - the old setting has been used, but please update your config.
   [2022-12-29, 19:11:30 UTC] {dagbag.py:538} INFO - Filling up the DagBag from /opt/airflow/dags/example_local_kubernetes_executor.py
   ```
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on PR #28161:
URL: https://github.com/apache/airflow/pull/28161#issuecomment-1384526101

   Anyone have some time to give this a second review/approval? Would be nice to get this merged for @snjypl
   Maybe @potiuk, @eladkal or @pierrejeambrun?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
eladkal commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1080630500


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -773,6 +775,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             # do this once, so only do it when we remove the task from running
             self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):
+            namespace = pod_override.metadata.namespace
+        return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
+
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:
+
+        try:
+            from airflow.kubernetes.pod_generator import PodGenerator
+
+            client = get_kube_client()
+
+            log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                raise RuntimeError("Cannot find pod for ti %s", ti)
+            elif len(pod_list) > 1:
+                raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)

Review Comment:
   Not really part of this PR but feels like the right place to ask.
   
   Why do we raise these exceptions and not write the issue to the log and return it? (Like lines 285-287)
   
   ```
           except Exception as f:
               log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
               return log, {"end_of_log": True}
   ```
   
   I wonder if this is the reason users sometimes don't see the task log and it makes them harder to find the root cause like in https://github.com/apache/airflow/issues/29025 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1043906313


##########
airflow/executors/base_executor.py:
##########
@@ -373,6 +374,62 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None,
             return dag_id, task_id
         return None, None
 
+    @staticmethod
+    def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> str:
+        url = urljoin(
+            f"http://{ti.hostname}:{conf.get('logging', 'WORKER_LOG_SERVER_PORT')}/log/",
+            log_relative_path,
+        )
+        return url
+
+    def get_task_log(self, ti: TaskInstance, log_relative_path: str) -> str | tuple[str, dict[str, bool]]:

Review Comment:
   Perfect, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas merged pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas merged PR #28161:
URL: https://github.com/apache/airflow/pull/28161


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1081083607


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -773,6 +775,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             # do this once, so only do it when we remove the task from running
             self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):
+            namespace = pod_override.metadata.namespace
+        return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
+
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:
+
+        try:
+            from airflow.kubernetes.pod_generator import PodGenerator
+
+            client = get_kube_client()
+
+            log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                raise RuntimeError("Cannot find pod for ti %s", ti)
+            elif len(pod_list) > 1:
+                raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)

Review Comment:
   @eladkal  i think, #29025  is more about the error  that we log around these part.  https://github.com/apache/airflow/blob/1e385ac36cf84cca92cc18bb528e381904b44783/airflow/executors/kubernetes_executor.py#L690-L714 .
   
   These logs i believe are part of the scheduler logs and won't be visible as part of the task's log since we only fetch the logs from task's k8s pod in `kubernetes_executor.get_task_log`. 
   
   
   regarding the exceptions, am not sure if i understand you correctly, but  i think, those exceptions are caught by the enclosing try/except and returned to the user. 
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1083578430


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -204,91 +234,24 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-                from airflow.kubernetes.pod_generator import PodGenerator
-
-                client = get_kube_client()
-
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                selector = PodGenerator.build_selector_for_k8s_executor_pod(
-                    dag_id=ti.dag_id,
-                    task_id=ti.task_id,
-                    try_number=ti.try_number,
-                    map_index=ti.map_index,
-                    run_id=ti.run_id,
-                    airflow_worker=ti.queued_by_job_id,
-                )
-                namespace = self._get_pod_namespace(ti)
-                pod_list = client.list_namespaced_pod(
-                    namespace=namespace,
-                    label_selector=selector,
-                ).items
-                if not pod_list:
-                    raise RuntimeError("Cannot find pod for ti %s", ti)
-                elif len(pod_list) > 1:
-                    raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)
-                res = client.read_namespaced_pod_log(
-                    name=pod_list[0].metadata.name,
-                    namespace=namespace,
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
+        else:
+            log += f"*** Local log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()
+            task_log = None
 
-                for line in res:
-                    log += line.decode()
+            if hasattr(executor, "get_task_log"):

Review Comment:
   Why do we need this check ? I think it only helps for custom executor that are not `BaseExecutor`, but other PR removed such check I believe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1044920796


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
+        else:
 
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            log += f"*** Log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()

Review Comment:
   You may need to first check if that method exists on the executor class before calling it or update some of the other executors, pending the result of the discussion here: https://github.com/apache/airflow/issues/28276#issuecomment-1344899475



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1081083607


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -773,6 +775,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             # do this once, so only do it when we remove the task from running
             self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):
+            namespace = pod_override.metadata.namespace
+        return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
+
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:
+
+        try:
+            from airflow.kubernetes.pod_generator import PodGenerator
+
+            client = get_kube_client()
+
+            log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                raise RuntimeError("Cannot find pod for ti %s", ti)
+            elif len(pod_list) > 1:
+                raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)

Review Comment:
   @eladkal  i think, #29025  is more about the error  that we log around these part.  https://github.com/apache/airflow/blob/1e385ac36cf84cca92cc18bb528e381904b44783/airflow/executors/kubernetes_executor.py#L690-L714 . these logs i believe are part of the scheduler logs.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1043628357


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -173,88 +170,15 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif conf.get("core", "executor") == "KubernetesExecutor":
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get("kubernetes_executor", "namespace"))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get("kubernetes_executor", "namespace"),
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:
-                log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
-                return log, {"end_of_log": True}
         else:
-            import httpx
 
-            url = self._get_log_retrieval_url(ti, log_relative_path)
-            log += f"*** Log file does not exist: {location}\n"
-            log += f"*** Fetching from: {url}\n"
-            try:
-                timeout = None  # No timeout
-                try:
-                    timeout = conf.getint("webserver", "log_fetch_timeout_sec")
-                except (AirflowConfigException, ValueError):
-                    pass
-
-                signer = JWTSigner(
-                    secret_key=conf.get("webserver", "secret_key"),
-                    expiration_time_in_seconds=conf.getint(
-                        "webserver", "log_request_clock_grace", fallback=30
-                    ),
-                    audience="task-instance-logs",
-                )
-                response = httpx.get(
-                    url,
-                    timeout=timeout,
-                    headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
-                )
-                response.encoding = "utf-8"
-
-                if response.status_code == 403:
-                    log += (
-                        "*** !!!! Please make sure that all your Airflow components (e.g. "
-                        "schedulers, webservers and workers) have "
-                        "the same 'secret_key' configured in 'webserver' section and "
-                        "time is synchronized on all your machines (for example with ntpd) !!!!!\n***"
-                    )
-                    log += (
-                        "*** See more at https://airflow.apache.org/docs/apache-airflow/"
-                        "stable/configurations-ref.html#secret-key\n***"
-                    )
-                # Check if the resource was properly fetched
-                response.raise_for_status()
-
-                log += "\n" + response.text
-            except Exception as e:
-                log += f"*** Failed to fetch log file from worker. {str(e)}\n"
-                return log, {"end_of_log": True}
+            executor = ExecutorLoader.get_default_executor()
+            task_log = executor.get_task_log(ti, log_relative_path=log_relative_path)
+
+            if isinstance(task_log, tuple):
+                return task_log

Review Comment:
   Previously in the exception/end of log cases the `log` as well as `{"end_of_log": True}` was returned. So shouldn't you return the whole tuple here?



##########
airflow/executors/base_executor.py:
##########
@@ -373,6 +374,62 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None,
             return dag_id, task_id
         return None, None
 
+    @staticmethod
+    def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> str:
+        url = urljoin(
+            f"http://{ti.hostname}:{conf.get('logging', 'WORKER_LOG_SERVER_PORT')}/log/",
+            log_relative_path,
+        )
+        return url
+
+    def get_task_log(self, ti: TaskInstance, log_relative_path: str) -> str | tuple[str, dict[str, bool]]:

Review Comment:
   This code to fetch logs from workers over the network is not related to executors, so I don't think this implementation belongs in the base executor. I'd actually propose to put this back into `file_task_handler.py` and for this method on the base executor to just be a stub method with a `pass`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1043664573


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -173,88 +170,15 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif conf.get("core", "executor") == "KubernetesExecutor":
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get("kubernetes_executor", "namespace"))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get("kubernetes_executor", "namespace"),
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:
-                log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
-                return log, {"end_of_log": True}
         else:
-            import httpx
 
-            url = self._get_log_retrieval_url(ti, log_relative_path)
-            log += f"*** Log file does not exist: {location}\n"
-            log += f"*** Fetching from: {url}\n"
-            try:
-                timeout = None  # No timeout
-                try:
-                    timeout = conf.getint("webserver", "log_fetch_timeout_sec")
-                except (AirflowConfigException, ValueError):
-                    pass
-
-                signer = JWTSigner(
-                    secret_key=conf.get("webserver", "secret_key"),
-                    expiration_time_in_seconds=conf.getint(
-                        "webserver", "log_request_clock_grace", fallback=30
-                    ),
-                    audience="task-instance-logs",
-                )
-                response = httpx.get(
-                    url,
-                    timeout=timeout,
-                    headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
-                )
-                response.encoding = "utf-8"
-
-                if response.status_code == 403:
-                    log += (
-                        "*** !!!! Please make sure that all your Airflow components (e.g. "
-                        "schedulers, webservers and workers) have "
-                        "the same 'secret_key' configured in 'webserver' section and "
-                        "time is synchronized on all your machines (for example with ntpd) !!!!!\n***"
-                    )
-                    log += (
-                        "*** See more at https://airflow.apache.org/docs/apache-airflow/"
-                        "stable/configurations-ref.html#secret-key\n***"
-                    )
-                # Check if the resource was properly fetched
-                response.raise_for_status()
-
-                log += "\n" + response.text
-            except Exception as e:
-                log += f"*** Failed to fetch log file from worker. {str(e)}\n"
-                return log, {"end_of_log": True}
+            executor = ExecutorLoader.get_default_executor()
+            task_log = executor.get_task_log(ti, log_relative_path=log_relative_path)
+
+            if isinstance(task_log, tuple):
+                return task_log

Review Comment:
   task_log  is the returned tuple | str.  incase of exception, we are returning the tuple



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1043666056


##########
airflow/executors/base_executor.py:
##########
@@ -373,6 +374,62 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None,
             return dag_id, task_id
         return None, None
 
+    @staticmethod
+    def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> str:
+        url = urljoin(
+            f"http://{ti.hostname}:{conf.get('logging', 'WORKER_LOG_SERVER_PORT')}/log/",
+            log_relative_path,
+        )
+        return url
+
+    def get_task_log(self, ti: TaskInstance, log_relative_path: str) -> str | tuple[str, dict[str, bool]]:

Review Comment:
   i was thinking, getting the log from network  would be the default base executor behavior and getting from the pod a special case for kube executor.   will put this code back in file_task_handler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1044914857


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
+        else:
 
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            log += f"*** Log file does not exist: {location}\n"

Review Comment:
   ```suggestion
               log += f"*** Local log file does not exist, trying to fetch logs from executor environment ***\n\n"
   ```
   
   This more closely matches what was there previously as well as the new context you added.



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
+        else:
 
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            log += f"*** Log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()

Review Comment:
   You may need to first check if that method exists on the executor class before calling it, pending the result of the discussion here: https://github.com/apache/airflow/issues/28276#issuecomment-1344899475



##########
tests/utils/test_log_handlers.py:
##########
@@ -267,36 +264,3 @@ def test_log_retrieval_valid(self, create_task_instance):
         log_url_ti.hostname = "hostname"
         url = FileTaskHandler._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH")
         assert url == "http://hostname:8793/log/DYNAMIC_PATH"
-
-
-@pytest.mark.parametrize(
-    "config, queue, expected",
-    [
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), None, False),
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), "kubernetes", False),
-        (dict(AIRFLOW__CORE__EXECUTOR="KubernetesExecutor"), None, True),
-        (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "any", False),
-        (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "kubernetes", True),
-        (
-            dict(
-                AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor",
-                AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
-            ),
-            "hithere",
-            True,
-        ),
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "any", False),
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "kubernetes", True),
-        (
-            dict(
-                AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor",
-                AIRFLOW__LOCAL_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
-            ),
-            "hithere",
-            True,
-        ),
-    ],
-)
-def test__should_check_k8s(config, queue, expected):
-    with patch.dict("os.environ", **config):
-        assert FileTaskHandler._should_check_k8s(queue) == expected

Review Comment:
   You've nicely refactored `FileTaskHandler._read` to be unittestable. You can mock `os.path.exists(location)` to return false and also mock the kubernetes executor, then ensure `get_task_log` was called once with the expected ti input. You should then swap the executor to one that doesn't have an implementation and ensure you get None back (you shouldn't need to mock in that case since it has no implementation) and that the `_get_task_log_from_worker` method is called once (will need to mock that one).



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
+        else:
 
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            log += f"*** Log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()
+            task_log = executor.get_task_log(ti)
 
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get("kubernetes_executor", "namespace"),
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
+            if isinstance(task_log, tuple):
+                return task_log
 
-                for line in res:
-                    log += line.decode()
+            if task_log is None:
+                task_log = self._get_task_log_from_worker(ti, log, log_relative_path=log_relative_path)

Review Comment:
   Should add a log here, saying that we're now falling back to fetching from worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1095134144


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -773,6 +775,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             # do this once, so only do it when we remove the task from running
             self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):
+            namespace = pod_override.metadata.namespace
+        return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
+
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:
+
+        try:
+            from airflow.kubernetes.pod_generator import PodGenerator
+
+            client = get_kube_client()
+
+            log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                raise RuntimeError("Cannot find pod for ti %s", ti)
+            elif len(pod_list) > 1:
+                raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)
+            res = client.read_namespaced_pod_log(
+                name=pod_list[0].metadata.name,

Review Comment:
   This is not code that is new to this PR. It was just moved to a different location. If you see the `airflow/utils/log/file_task_handler.py` module, this code existed there before these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on PR #28161:
URL: https://github.com/apache/airflow/pull/28161#issuecomment-1371567456

   > @o-nikolas i did some manual testing with different executors. it is working as expected.
   
   Fantastic! Thanks @snjypl!
   
   Sorry for the delayed response but I was out on holiday for the past couple weeks, just catching back up now.
   
   Looks like there are some conflicts in main that you need to rebase and resolve, otherwise it LGTM!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on PR #28161:
URL: https://github.com/apache/airflow/pull/28161#issuecomment-1374224718

   @o-nikolas   i have resolved the merge conflicts. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1059102738


##########
airflow/executors/base_executor.py:
##########
@@ -304,6 +304,9 @@ def execute_async(
         """
         raise NotImplementedError()
 
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> None | str | tuple[str, dict[str, bool]]:

Review Comment:
   Done
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1064578797


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -749,6 +751,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
                 self.log.debug("Could not find key: %s", str(key))
         self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):

Review Comment:
   @o-nikolas  those changes were part of https://github.com/apache/airflow/pull/28546 . i simply moved it from `file_task_handler` to `kubernetes_executor`. 
   
   @dstandish  might be able to help with it. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1064208434


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -749,6 +751,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
                 self.log.debug("Could not find key: %s", str(key))
         self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):

Review Comment:
   Do we really want to suppress any and all exceptions here? What is the specific error case this is covering?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on PR #28161:
URL: https://github.com/apache/airflow/pull/28161#issuecomment-1396069446

   @potiuk @eladkal @pierrejeambrun  will be great if you could review this PR whenever you get a chance ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by "snjypl (via GitHub)" <gi...@apache.org>.
snjypl commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1083706656


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -204,91 +234,24 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-                from airflow.kubernetes.pod_generator import PodGenerator
-
-                client = get_kube_client()
-
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-                selector = PodGenerator.build_selector_for_k8s_executor_pod(
-                    dag_id=ti.dag_id,
-                    task_id=ti.task_id,
-                    try_number=ti.try_number,
-                    map_index=ti.map_index,
-                    run_id=ti.run_id,
-                    airflow_worker=ti.queued_by_job_id,
-                )
-                namespace = self._get_pod_namespace(ti)
-                pod_list = client.list_namespaced_pod(
-                    namespace=namespace,
-                    label_selector=selector,
-                ).items
-                if not pod_list:
-                    raise RuntimeError("Cannot find pod for ti %s", ti)
-                elif len(pod_list) > 1:
-                    raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)
-                res = client.read_namespaced_pod_log(
-                    name=pod_list[0].metadata.name,
-                    namespace=namespace,
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
+        else:
+            log += f"*** Local log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()
+            task_log = None
 
-                for line in res:
-                    log += line.decode()
+            if hasattr(executor, "get_task_log"):

Review Comment:
   thanks @pierrejeambrun  i went through the  discussion https://github.com/apache/airflow/issues/28276#issuecomment-1352590074 .  i have removed the `hasattr` check. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28161: AIP-51 - Executor Coupling in Logging

Posted by "XD-DENG (via GitHub)" <gi...@apache.org>.
XD-DENG commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1094021733


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -773,6 +775,57 @@ def _change_state(self, key: TaskInstanceKey, state: str | None, pod_id: str, na
             # do this once, so only do it when we remove the task from running
             self.event_buffer[key] = state, None
 
+    @staticmethod
+    def _get_pod_namespace(ti: TaskInstance):
+        pod_override = ti.executor_config.get("pod_override")
+        namespace = None
+        with suppress(Exception):
+            namespace = pod_override.metadata.namespace
+        return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
+
+    def get_task_log(self, ti: TaskInstance, log: str = "") -> str | tuple[str, dict[str, bool]]:
+
+        try:
+            from airflow.kubernetes.pod_generator import PodGenerator
+
+            client = get_kube_client()
+
+            log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
+            selector = PodGenerator.build_selector_for_k8s_executor_pod(
+                dag_id=ti.dag_id,
+                task_id=ti.task_id,
+                try_number=ti.try_number,
+                map_index=ti.map_index,
+                run_id=ti.run_id,
+                airflow_worker=ti.queued_by_job_id,
+            )
+            namespace = self._get_pod_namespace(ti)
+            pod_list = client.list_namespaced_pod(
+                namespace=namespace,
+                label_selector=selector,
+            ).items
+            if not pod_list:
+                raise RuntimeError("Cannot find pod for ti %s", ti)
+            elif len(pod_list) > 1:
+                raise RuntimeError("Found multiple pods for ti %s: %s", ti, pod_list)
+            res = client.read_namespaced_pod_log(
+                name=pod_list[0].metadata.name,

Review Comment:
   Checking this part of code: why do we need to do the works above to get the pod name? The `ti.hostname` is just the pod name, isn't it?
   
   cc @o-nikolas @snjypl 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on pull request #28161: WIP AIP-51 - Executor Coupling in Logging

Posted by GitBox <gi...@apache.org>.
snjypl commented on PR #28161:
URL: https://github.com/apache/airflow/pull/28161#issuecomment-1350057453

   > Marking my review as request for changes regarding unit testing (see [here](https://github.com/apache/airflow/pull/28161/files#r1044921868))
   @o-nikolas  i have added some unittests,  please take a look when you get a chance.  Thanks. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org