You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/04/23 03:38:05 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #15500: Handle kubernetes watcher stream disconnection

ephraimbuddy opened a new pull request #15500:
URL: https://github.com/apache/airflow/pull/15500


   Currently, when Kubernetes watch stream times out and we get error 410,
   we just return resource version '0' which is not the latest version.
   
   From the documentation, timing out is expected and we should handle it
   by performing a list>watch>relist operation so we can continue watching
   from the latest resource version. See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
   
   This PR follows the list>watch>relist pattern
   
   Closes: #15418
   This would likely fix some issues #14175, #13916, and #12644 (comment)
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-827411782


   [The Workflow run](https://github.com/apache/airflow/actions/runs/788329486) is cancelling this PR. Building images for the PR has failed. Follow the workflow link to check the reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] JeremieDoctrine commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
JeremieDoctrine commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-1008780219


   @kaxil did you find an alternate solution. We are also facing stream disconnection.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619455111



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,49 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:

Review comment:
       Cute name, but we don't need a separate class for this do we?
   
   If we do, lets be more descriptive about the name and call it `SharedStateMixin`

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,49 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: client.CoreV1Api = None,
+        namespace: str = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)

Review comment:
       ```suggestion
           super().__init__(self)
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,49 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: client.CoreV1Api = None,
+        namespace: str = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)

Review comment:
       Since you have set `self.__dict__` to a shared dictionary, can't you do `self.resource_version = resource_version`?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -123,21 +156,20 @@ def run(self) -> None:
     def _run(
         self,
         kube_client: client.CoreV1Api,
-        resource_version: Optional[str],
+        resource_version: str,
         scheduler_job_id: str,
         kube_config: Any,
     ) -> Optional[str]:
         self.log.info('Event: and now my watch begins starting at resource_version: %s', resource_version)
         watcher = watch.Watch()

Review comment:
       I know you didn't change this in the PR, but how come this just calls `watch.Watch()` and doesn't use `_make_kube_watcher`?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -171,16 +204,21 @@ def _run(
 
         return last_resource_version
 
-    def process_error(self, event: Any) -> str:
+    def process_error(
+        self,
+        event: Any,
+        kube_client: client.CoreV1Api,
+    ) -> str:
         """Process error response"""
         self.log.error('Encountered Error response from k8s list namespaced pod stream => %s', event)
         raw_object = event['raw_object']
         if raw_object['code'] == 410:
             self.log.info(
-                'Kubernetes resource version is too old, must reset to 0 => %s', (raw_object['message'],)
+                'Kubernetes resource version is too old, '
+                'relisting pods to get the latest version. Error => %s',
+                (raw_object['message'],),
             )
-            # Return resource version 0
-            return '0'
+            return get_resource_version(kube_client, self.namespace)

Review comment:
       It might also be more correct -- if the version is too old, and we start at a _later_ version, might that not mean we've missed some events, so re-listing is the "correct" behaviur




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-974610390


   > @ephraimbuddy Can you remember why we closed this PR -- do we have an alternate solution?
   
   I have forgotten why I closed this but it looks related to the above `allow_watch_bookmarks` that I found. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619445297



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}

Review comment:
       This shouldn't be part of Borg it's here so it only applies to this instance. And the private method is so that whoever is using it in code instead of in tests knows it's a deliberate act. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619477357



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -171,16 +204,21 @@ def _run(
 
         return last_resource_version
 
-    def process_error(self, event: Any) -> str:
+    def process_error(
+        self,
+        event: Any,
+        kube_client: client.CoreV1Api,
+    ) -> str:
         """Process error response"""
         self.log.error('Encountered Error response from k8s list namespaced pod stream => %s', event)
         raw_object = event['raw_object']
         if raw_object['code'] == 410:
             self.log.info(
-                'Kubernetes resource version is too old, must reset to 0 => %s', (raw_object['message'],)
+                'Kubernetes resource version is too old, '
+                'relisting pods to get the latest version. Error => %s',
+                (raw_object['message'],),
             )
-            # Return resource version 0
-            return '0'
+            return get_resource_version(kube_client, self.namespace)

Review comment:
       If we return `None` here the code would be ugly, because then, we have to check the returned value of `_run` method in `run` method. If `_run` returns None, we call get_resource_version and then set self.resource_version.
   
   Still the same behaviour but ugly. This is what I think it would look like:
   
   ```python
       def run(self) -> None:
           """Performs watching"""
           kube_client: client.CoreV1Api = get_kube_client()
           if not self.scheduler_job_id:
               raise AirflowException(NOT_STARTED_MESSAGE)
           while True:
               try:
                   self.resource_version = self._run(
                       kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
                   )
                   if self.resource_version is None:
                       self.resource_version = get_resource_version(kube_client, self.namespace)
               except ReadTimeoutError:
                   self.log.warning(
                       "There was a timeout error accessing the Kube API. Retrying request.", exc_info=True
                   )
                   time.sleep(1)
               except Exception:
                   self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
                   raise
               else:
                   self.log.warning(
                       'Watch died gracefully, starting back up with: last resource_version: %s',
                       self.resource_version,
                   )
   ```
   Which gives the same result, but all these was solved by calling the get_resource_version on error
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-883294131


   I came across using `allow_watch_bookmarks`  https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
   to get the latest resource version but the python client have it disabled for now https://github.com/kubernetes-client/python-base/pull/234/files . 
   That would have solved this for us


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-847065029


   @ephraimbuddy Conflicts here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-883294131


   I can across using `allow_watch_bookmarks`  https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
   to get the latest resource version but the python client have it disabled for now https://github.com/kubernetes-client/python-base/pull/234/files . 
   That would have solved this for us


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r620689702



##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -596,3 +598,49 @@ def test_process_status_catchall(self):
 
         self._run()
         self.watcher.watcher_queue.put.assert_not_called()
+
+    @mock.patch('airflow.executors.kubernetes_executor.get_latest_resource_version')
+    def test_process_error_event(self, mock_get_resource_version):
+        mock_get_resource_version.return_value = '43334'
+        self.pod.status.phase = 'Pending'
+        self.pod.metadata.resource_version = '43334'
+        raw_object = {"code": 410, "message": "too old resource version: 27272 (43334)"}
+        self.events.append({"type": "ERROR", "object": self.pod, "raw_object": raw_object})
+        self._run()
+        assert mock_get_resource_version.called

Review comment:
       ```suggestion
           mock_get_resource_version.assert_called_once()
   ```
   nit.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -65,15 +65,46 @@
 
 
 class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+    """
+    Track resourceVersion from Kubernetes
+
+    All instances of this class share the same state
+    """
+
+    _shared_state = {}
+
+    def __init__(
+        self,
+        *,
+        kube_client: client.CoreV1Api = None,
+        namespace: str = None,
+        resource_version: Optional[str] = None,
+    ):
+        self.__dict__ = self._shared_state
+        if resource_version:
+            # Update the state
+            self.resource_version = resource_version
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required to get resource version")
+            re_version = get_latest_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
 
-    _instance = None
-    resource_version = "0"
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+
+def get_latest_resource_version(kube_client: client.CoreV1Api, namespace: str):

Review comment:
       ```suggestion
   def get_latest_resource_version(kube_client: client.CoreV1Api, namespace: str) -> str:
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -65,15 +65,46 @@
 
 
 class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+    """
+    Track resourceVersion from Kubernetes
+
+    All instances of this class share the same state
+    """
+
+    _shared_state = {}
+
+    def __init__(
+        self,
+        *,
+        kube_client: client.CoreV1Api = None,
+        namespace: str = None,
+        resource_version: Optional[str] = None,
+    ):
+        self.__dict__ = self._shared_state
+        if resource_version:
+            # Update the state
+            self.resource_version = resource_version
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required to get resource version")
+            re_version = get_latest_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
 
-    _instance = None
-    resource_version = "0"
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+
+def get_latest_resource_version(kube_client: client.CoreV1Api, namespace: str):
+    """
+    List pods to get the latest resource version
+
+    See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
+    """
+    pod_list = kube_client.list_namespaced_pod(namespace)
+    resource_version = pod_list.metadata.resource_version
+    return resource_version

Review comment:
       ```suggestion
       return pod_list.metadata.resource_version
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -596,3 +598,49 @@ def test_process_status_catchall(self):
 
         self._run()
         self.watcher.watcher_queue.put.assert_not_called()
+
+    @mock.patch('airflow.executors.kubernetes_executor.get_latest_resource_version')
+    def test_process_error_event(self, mock_get_resource_version):
+        mock_get_resource_version.return_value = '43334'
+        self.pod.status.phase = 'Pending'
+        self.pod.metadata.resource_version = '43334'
+        raw_object = {"code": 410, "message": "too old resource version: 27272 (43334)"}
+        self.events.append({"type": "ERROR", "object": self.pod, "raw_object": raw_object})
+        self._run()
+        assert mock_get_resource_version.called
+
+
+class TestResourceVersion(unittest.TestCase):
+    # pylint: disable=no-member
+    def tearDown(self) -> None:
+        ResourceVersion._drop()
+
+    def test_can_update_with_resource_version_arg(self):
+        resource_instance = ResourceVersion(resource_version='4567')
+        assert resource_instance.resource_version == '4567'
+
+    @mock.patch('airflow.executors.kubernetes_executor.get_latest_resource_version')
+    def test_different_instance_share_state(self, mock_get_resource_version):
+        kube_client = mock.MagicMock()
+        mock_get_resource_version.return_value = '4566'
+        resource_instance = ResourceVersion(kube_client=kube_client, namespace='mynamespace')
+        resource_instance2 = ResourceVersion(kube_client=kube_client, namespace='mynamespace')
+
+        assert resource_instance.resource_version == '4566'
+        assert resource_instance2.resource_version == '4566'
+        resource_instance3 = ResourceVersion(resource_version='6787')
+        resource_instance4 = ResourceVersion(kube_client=kube_client, namespace='mynamespace')
+        assert resource_instance.resource_version == '6787'
+        assert resource_instance2.resource_version == '6787'
+        assert resource_instance3.resource_version == '6787'
+        assert resource_instance4.resource_version == '6787'
+

Review comment:
       ```suggestion
   mock_get_resource_version.assert_called_once()
   
   ```
   
   Maybe check we only listed once and shared state the rest of the time?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-825882944


   [The Workflow run](https://github.com/apache/airflow/actions/runs/778818910) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619505049



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -171,16 +204,21 @@ def _run(
 
         return last_resource_version
 
-    def process_error(self, event: Any) -> str:
+    def process_error(
+        self,
+        event: Any,
+        kube_client: client.CoreV1Api,
+    ) -> str:
         """Process error response"""
         self.log.error('Encountered Error response from k8s list namespaced pod stream => %s', event)
         raw_object = event['raw_object']
         if raw_object['code'] == 410:
             self.log.info(
-                'Kubernetes resource version is too old, must reset to 0 => %s', (raw_object['message'],)
+                'Kubernetes resource version is too old, '
+                'relisting pods to get the latest version. Error => %s',
+                (raw_object['message'],),
             )
-            # Return resource version 0
-            return '0'
+            return get_resource_version(kube_client, self.namespace)

Review comment:
       > if the version is too old, and we start at a _later_ version, might that not mean we've missed some events, so re-listing is the "correct" behaviur
   
   Yeah, this was always an issue and continues to be, this is just the right way to start watching again. What is missing is reconciling our state vs the list to figure out the events we missed, but that is a bigger fish imo.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-974610390


   > @ephraimbuddy Can you remember why we closed this PR -- do we have an alternate solution?
   
   I have forgotten why I closed this but it looks related to the above `allow_watch_bookmarks` that I found. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy closed pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy closed pull request #15500:
URL: https://github.com/apache/airflow/pull/15500


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-827885109


   >  we do a list and get back version A+5 (say) then we have "lost" the events A+1..A+4, i.e. we would never call `process_event()` for the events in this range.
   
   Unfortunately, right now we have that same problem as we (attempt to) rewatch from "0", a.k.a _any_ point in history. Per the docs, the latest is preferred so the behavior is likely pretty similar:
   https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter
   
   I think the piece we are missing is a step to reconcile our state against what the list returns, e.g. detect we missed A+1..A+4 and action them. More work needs to be done to harden this area and this is the first step (and this alone doesn't buy us much other than setting us up for being more complete later).
   
   @ephraimbuddy and I will spend some time trying to reproduce this scenario on demand to make sure we are starting down the right path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619445297



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}

Review comment:
       This shouldn't be part of Borg it's here so it only applies to this class. And the private method is so that whoever is using it in code instead of in tests knows it's a deliberate act. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619374395



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}
+
+
+def get_resource_version(kube_client: client.CoreV1Api, namespace: str):

Review comment:
       ```suggestion
   def get_latest_resource_version(kube_client: client.CoreV1Api, namespace: str):
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -123,21 +157,20 @@ def run(self) -> None:
     def _run(
         self,
         kube_client: client.CoreV1Api,
-        resource_version: Optional[str],
+        resource_version: str,
         scheduler_job_id: str,
         kube_config: Any,
     ) -> Optional[str]:
         self.log.info('Event: and now my watch begins starting at resource_version: %s', resource_version)
         watcher = watch.Watch()
 
         kwargs = {'label_selector': f'airflow-worker={scheduler_job_id}'}
-        if resource_version:
-            kwargs['resource_version'] = resource_version
+        kwargs['resource_version'] = resource_version

Review comment:
       Can't do suggestions across deleted lines, but just toss this in the initial dict.

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,

Review comment:
       ```suggestion
           kube_client: client.CoreV1Api = None,
           namespace: str = None,
   ```
   
   If we are going to raise when these are None, they should be optional?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}
+
+
+def get_resource_version(kube_client: client.CoreV1Api, namespace: str):
+    """
+    List pods to get the latest resource version
+
+    See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
+    """
+    pod_list = kube_client.list_namespaced_pod(namespace)
+    resource_version = pod_list.metadata.resource_version
+    return resource_version

Review comment:
       ```suggestion
       return pod_list.metadata.resource_version
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -171,16 +204,21 @@ def _run(
 
         return last_resource_version
 
-    def process_error(self, event: Any) -> str:
+    def process_error(
+        self,
+        event: Any,
+        kube_client: client.CoreV1Api,
+    ) -> str:
         """Process error response"""
         self.log.error('Encountered Error response from k8s list namespaced pod stream => %s', event)
         raw_object = event['raw_object']
         if raw_object['code'] == 410:
             self.log.info(
-                'Kubernetes resource version is too old, must reset to 0 => %s', (raw_object['message'],)
+                'Kubernetes resource version is too old, '
+                'relisting pods to get the latest version. Error => %s',
+                (raw_object['message'],),
             )
-            # Return resource version 0
-            return '0'
+            return get_resource_version(kube_client, self.namespace)

Review comment:
       I wonder if this would be cleaner if we return `None` here and let the next `_run` relist at the start? It would also keep the list specifics in one spot vs both here and in `_make_kube_watcher`. Thoughts?

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -265,7 +303,10 @@ def run_pod_async(self, pod: V1Pod, **kwargs):
         return resp
 
     def _make_kube_watcher(self) -> KubernetesJobWatcher:
-        resource_version = ResourceVersion().resource_version
+        resource_instance = ResourceVersion(
+            kube_client=self.kube_client, namespace=self.kube_config.kube_namespace
+        )
+        resource_version = resource_instance.resource_version  # pylint: disable=no-member

Review comment:
       ```suggestion
           resource_version = ResourceVersion(
               kube_client=self.kube_client, namespace=self.kube_config.kube_namespace
           ).resource_version  # pylint: disable=no-member
   ```
   
   (Untested, might have linting problems)

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -64,16 +64,50 @@
 KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]
 
 
-class ResourceVersion:
-    """Singleton for tracking resourceVersion from Kubernetes"""
+class Borg:
+    """Borg class making all instances share state"""
 
-    _instance = None
-    resource_version = "0"
+    _shared_state = {}
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    def __init__(self):
+        self.__dict__ = self._shared_state
+
+
+class ResourceVersion(Borg):
+    """Track resourceVersion from Kubernetes"""
+
+    def __init__(
+        self,
+        *,
+        kube_client: Optional[client.CoreV1Api] = None,
+        namespace: Optional[str] = None,
+        resource_version: Optional[str] = None,
+    ):
+        Borg.__init__(self)
+        if resource_version:
+            # Update the state
+            self._shared_state.update(resource_version=resource_version)
+        if not hasattr(self, 'resource_version'):
+            if not (kube_client and namespace):
+                raise AirflowException("kube_client and namespace is required")
+            re_version = get_resource_version(kube_client, namespace)
+            self._shared_state.update(resource_version=re_version)
+
+    @classmethod
+    def _drop(cls):
+        """Clear shared state (For testing purposes)"""
+        cls._shared_state = {}

Review comment:
       Should this not be part of `Borg`? Having a private method only for tests feels a little odd. Maybe make it a public method instead. Maybe `reset` would be a better name too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-847636522


   > @ephraimbuddy Conflicts here
   
   Resolved. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-883294131


   I can across using `allow_watch_bookmarks`  https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
   to get the latest resource version but the python client have it disabled for now https://github.com/kubernetes-client/python-base/pull/234/files . 
   That would have solved this for us


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#discussion_r619481138



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -123,21 +156,20 @@ def run(self) -> None:
     def _run(
         self,
         kube_client: client.CoreV1Api,
-        resource_version: Optional[str],
+        resource_version: str,
         scheduler_job_id: str,
         kube_config: Any,
     ) -> Optional[str]:
         self.log.info('Event: and now my watch begins starting at resource_version: %s', resource_version)
         watcher = watch.Watch()

Review comment:
       `_make_kube_watcher` builds a `KubernetesJobWatcher`, not a `Watch`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #15500: Handle kubernetes watcher stream disconnection

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #15500:
URL: https://github.com/apache/airflow/pull/15500#issuecomment-972898970


   @ephraimbuddy Can you remember why we closed this PR -- do we have an alternate solution?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org