You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/23 20:08:21 UTC

[GitHub] [airflow] ashb commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

ashb commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619455111



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,49 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:

Review comment:
       Cute name, but we don't need a separate class for this do we?
   
   If we do, lets be more descriptive about the name and call it `SharedStateMixin`

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,49 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: client.CoreV1Api = None,
+        namespace: str = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)

Review comment:
       ```suggestion
           super().__init__(self)
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,49 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: client.CoreV1Api = None,
+        namespace: str = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)

Review comment:
       Since you have set `self.__dict__` to a shared dictionary, can't you do `self.resource_version = resource_version`?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -123,21 +156,20 @@ def run(self) -> None:
     def _run(
         self,
         kube_client: client.CoreV1Api,
-        resource_version: Optional[str],
+        resource_version: str,
         scheduler_job_id: str,
         kube_config: Any,
     ) -> Optional[str]:
         self.log.info('Event: and now my watch begins starting at resource_version: %s', resource_version)
         watcher = watch.Watch()

Review comment:
       I know you didn't change this in the PR, but how come this just calls `watch.Watch()` and doesn't use `_make_kube_watcher`?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -171,16 +204,21 @@ def _run(
 
         return last_resource_version
 
-    def process_error(self, event: Any) -> str:
+    def process_error(
+        self,
+        event: Any,
+        kube_client: client.CoreV1Api,
+    ) -> str:
         """Process error response"""
         self.log.error('Encountered Error response from k8s list namespaced pod stream => %s', event)
         raw_object = event['raw_object']
         if raw_object['code'] == 410:
             self.log.info(
-                'Kubernetes resource version is too old, must reset to 0 => %s', (raw_object['message'],)
+                'Kubernetes resource version is too old, '
+                'relisting pods to get the latest version. Error => %s',
+                (raw_object['message'],),
             )
-            # Return resource version 0
-            return '0'
+            return get_resource_version(kube_client, self.namespace)

Review comment:
       It might also be more correct -- if the version is too old, and we start at a _later_ version, might that not mean we've missed some events, so re-listing is the "correct" behaviur




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org