You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/08 17:30:28 UTC

[GitHub] [airflow] o-nikolas commented on a diff in pull request #28161: WIP AIP-51 - Executor Coupling in Logging

o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1043628357


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -173,88 +170,15 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif conf.get("core", "executor") == "KubernetesExecutor":
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = kube_client.list_namespaced_pod(conf.get("kubernetes_executor", "namespace"))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n"
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get("kubernetes_executor", "namespace"),
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:
-                log += f"*** Unable to fetch logs from worker pod {ti.hostname} ***\n{str(f)}\n\n"
-                return log, {"end_of_log": True}
         else:
-            import httpx
 
-            url = self._get_log_retrieval_url(ti, log_relative_path)
-            log += f"*** Log file does not exist: {location}\n"
-            log += f"*** Fetching from: {url}\n"
-            try:
-                timeout = None  # No timeout
-                try:
-                    timeout = conf.getint("webserver", "log_fetch_timeout_sec")
-                except (AirflowConfigException, ValueError):
-                    pass
-
-                signer = JWTSigner(
-                    secret_key=conf.get("webserver", "secret_key"),
-                    expiration_time_in_seconds=conf.getint(
-                        "webserver", "log_request_clock_grace", fallback=30
-                    ),
-                    audience="task-instance-logs",
-                )
-                response = httpx.get(
-                    url,
-                    timeout=timeout,
-                    headers={"Authorization": signer.generate_signed_token({"filename": log_relative_path})},
-                )
-                response.encoding = "utf-8"
-
-                if response.status_code == 403:
-                    log += (
-                        "*** !!!! Please make sure that all your Airflow components (e.g. "
-                        "schedulers, webservers and workers) have "
-                        "the same 'secret_key' configured in 'webserver' section and "
-                        "time is synchronized on all your machines (for example with ntpd) !!!!!\n***"
-                    )
-                    log += (
-                        "*** See more at https://airflow.apache.org/docs/apache-airflow/"
-                        "stable/configurations-ref.html#secret-key\n***"
-                    )
-                # Check if the resource was properly fetched
-                response.raise_for_status()
-
-                log += "\n" + response.text
-            except Exception as e:
-                log += f"*** Failed to fetch log file from worker. {str(e)}\n"
-                return log, {"end_of_log": True}
+            executor = ExecutorLoader.get_default_executor()
+            task_log = executor.get_task_log(ti, log_relative_path=log_relative_path)
+
+            if isinstance(task_log, tuple):
+                return task_log

Review Comment:
   Previously in the exception/end of log cases the `log` as well as `{"end_of_log": True}` was returned. So shouldn't you return the whole tuple here?



##########
airflow/executors/base_executor.py:
##########
@@ -373,6 +374,62 @@ def validate_airflow_tasks_run_command(command: list[str]) -> tuple[str | None,
             return dag_id, task_id
         return None, None
 
+    @staticmethod
+    def _get_log_retrieval_url(ti: TaskInstance, log_relative_path: str) -> str:
+        url = urljoin(
+            f"http://{ti.hostname}:{conf.get('logging', 'WORKER_LOG_SERVER_PORT')}/log/",
+            log_relative_path,
+        )
+        return url
+
+    def get_task_log(self, ti: TaskInstance, log_relative_path: str) -> str | tuple[str, dict[str, bool]]:

Review Comment:
   This code to fetch logs from workers over the network is not related to executors, so I don't think this implementation belongs in the base executor. I'd actually propose to put this back into `file_task_handler.py` and for this method on the base executor to just be a stub method with a `pass`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org