You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/23 17:13:12 UTC

[GitHub] [airflow] jedcunningham commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

jedcunningham commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619374395



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}
+
+
+def get_resource_version(kube_client: client.CoreV1Api, namespace: str):

Review comment:
       ```suggestion
   def get_latest_resource_version(kube_client: client.CoreV1Api, namespace: str):
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -123,21 +157,20 @@ def run(self) -> None:
     def _run(
         self,
         kube_client: client.CoreV1Api,
-        resource_version: Optional[str],
+        resource_version: str,
         scheduler_job_id: str,
         kube_config: Any,
     ) -> Optional[str]:
         self.log.info('Event: and now my watch begins starting at resource_version: %s', resource_version)
         watcher = watch.Watch()
 
         kwargs = {'label_selector': f'airflow-worker={scheduler_job_id}'}
-        if resource_version:
-            kwargs['resource_version'] = resource_version
+        kwargs['resource_version'] = resource_version

Review comment:
       Can't do suggestions across deleted lines, but just toss this in the initial dict.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,

Review comment:
       ```suggestion
           kube_client: client.CoreV1Api = None,
           namespace: str = None,
   ```
   
   If we are going to raise when these are None, they should be optional?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}
+
+
+def get_resource_version(kube_client: client.CoreV1Api, namespace: str):
+    """
+    List pods to get the latest resource version
+
+    See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
+    """
+    pod_list = kube_client.list_namespaced_pod(namespace)
+    resource_version = pod_list.metadata.resource_version
+    return resource_version

Review comment:
       ```suggestion
       return pod_list.metadata.resource_version
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -171,16 +204,21 @@ def _run(
 
         return last_resource_version
 
-    def process_error(self, event: Any) -> str:
+    def process_error(
+        self,
+        event: Any,
+        kube_client: client.CoreV1Api,
+    ) -> str:
         """Process error response"""
         self.log.error('Encountered Error response from k8s list namespaced pod stream => %s', event)
         raw_object = event['raw_object']
         if raw_object['code'] == 410:
             self.log.info(
-                'Kubernetes resource version is too old, must reset to 0 => %s', (raw_object['message'],)
+                'Kubernetes resource version is too old, '
+                'relisting pods to get the latest version. Error => %s',
+                (raw_object['message'],),
             )
-            # Return resource version 0
-            return '0'
+            return get_resource_version(kube_client, self.namespace)

Review comment:
       I wonder if this would be cleaner if we return `None` here and let the next `_run` relist at the start? It would also keep the list specifics in one spot vs both here and in `_make_kube_watcher`. Thoughts?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -265,7 +303,10 @@ def run_pod_async(self, pod: V1Pod, **kwargs):
         return resp
 
     def _make_kube_watcher(self) -> KubernetesJobWatcher:
-        resource_version = ResourceVersion().resource_version
+        resource_instance = ResourceVersion(
+            kube_client=self.kube_client, namespace=self.kube_config.kube_namespace
+        )
+        resource_version = resource_instance.resource_version  # pylint: disable=no-member

Review comment:
       ```suggestion
           resource_version = ResourceVersion(
               kube_client=self.kube_client, namespace=self.kube_config.kube_namespace
           ).resource_version  # pylint: disable=no-member
   ```
   
   (Untested, might have linting problems)

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}

Review comment:
       Should this not be part of `Borg`? Having a private method only for tests feels a little odd. Maybe make it a public method instead. Maybe `reset` would be a better name too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org