You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/12/02 23:34:16 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #19572: Simplify KubernetesPodOperator

jedcunningham commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r761542110



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +130,101 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> V1Pod:
         """
-        Launches the pod synchronously and waits for completion.
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        return self.run_pod_async(pod)
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+        """
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optional[str]]:
+        while True:
+            remote_pod = self.read_pod(pod)
+            if remote_pod.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. "
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):

Review comment:
       ```suggestion
       def follow_container_logs(self, pod: V1Pod, container_name: str) -> bool:
   ```
   
   This one feels a little odd. It'll only ever return `True` or it will raise, right? In that case, does the return value even buy us anything?

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -51,10 +54,23 @@ def should_retry_start_pod(exception: Exception) -> bool:
 class PodStatus:
     """Status of the PODs"""
 
-    PENDING = 'pending'
-    RUNNING = 'running'
-    FAILED = 'failed'
-    SUCCEEDED = 'succeeded'
+    PENDING = 'Pending'
+    RUNNING = 'Running'
+    FAILED = 'Failed'
+    SUCCEEDED = 'Succeeded'
+
+    terminal_states = {FAILED, SUCCEEDED}
+
+
+def container_is_running(pod: V1Pod, container_name: str):

Review comment:
       ```suggestion
   def container_is_running(pod: V1Pod, container_name: str) -> bool:
   ```

##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +130,101 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> V1Pod:
         """
-        Launches the pod synchronously and waits for completion.
+        Launches the pod asynchronously.
+
+        :param pod:
+        :return:
+        """
+        return self.run_pod_async(pod)
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+        """
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optional[str]]:
+        while True:
+            remote_pod = self.read_pod(pod)
+            if remote_pod.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. "
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
         """
-        Monitors a pod and returns the final state, pod and xcom result
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:
+                break
+
+            if last_log_time:
+                delta = pendulum.now() - last_log_time
+                read_logs_since_sec = math.ceil(delta.total_seconds())
+
+            time.sleep(1)
+
+            if self.container_is_running(pod, container_name=container_name):
+                self.log.info('Container %s is running', pod.metadata.name)
+                self.log.warning('Pod %s log read interrupted', pod.metadata.name)
+            else:
+                container_stopped = True  # fetch logs once more and exit
+        return container_stopped
+
+    def await_container(self, pod: V1Pod, container_name: str):

Review comment:
       ```suggestion
       def await_container(self, pod: V1Pod, container_name: str) -> None:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org