You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/02 03:03:44 UTC

[GitHub] [airflow] XD-DENG opened a new pull request, #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

XD-DENG opened a new pull request, #28047:
URL: https://github.com/apache/airflow/pull/28047

   Currently `KubernetesExecutor`'s `multi_namespace_mode` requires the Scheduler to have cluster-scope role on the Kubernetes Cluster, because it's using function `list_pod_for_all_namespaces()`.
   
   However, in certain enterprise environments, it's not possible for users to have cluster-scope role. For example, they may only get permissions in a namespace, rather on the whole cluster. Always allowing the Scheduler pod to have cluster-scope role is not a good from security aspect either.
   
   This change aims to make `KubernetesExecutor`'s `multi_namespace_mode` work without cluster-scope role.
   
   (This was discussed at the mail list at https://lists.apache.org/thread/xxsppw7qwvky78l6nx41vlz593gj4zqb)
   
   I'm sure folks would have suggestions and we need to future refine this change, but I would like to bring up the discussion by creating this PR first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1038734853


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -444,6 +471,23 @@ def __init__(self):
         self.kubernetes_queue: str | None = None
         super().__init__(parallelism=self.kube_config.parallelism)
 
+    def _list_pods(self, query_kwargs):
+        if self.kube_config.multi_namespace_mode:
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                pods = []
+                for namespace in self.kube_config.multi_namespace_mode_namespace_list:
+                    pods.extend(
+                        self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
+                    )
+            else:
+                pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+        else:
+            pods = self.kube_client.list_namespaced_pod(
+                namespace=self.kube_config.kube_namespace, **query_kwargs
+            ).items
+
+        return pods
+

Review Comment:
   ```suggestion
       @cached_property
       def _namespaces(self):
           if self.kube_config.multi_namespace_mode_namespace_list:
               return self.kube_config.multi_namespace_mode_namespace_list
           elif self.kube_config.multi_namespace_mode:
               return []
           else:
               return [self.kube_config.kube_namespace]
   
       def _list_pods(self, query_kwargs):
           if not self._namespaces:
               yield from self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
           for namespace in self._namespaces:
               yield from self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
   
   ```



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -140,9 +141,14 @@ def _run(
 
         last_resource_version: str | None = None
         if self.multi_namespace_mode:
-            list_worker_pods = functools.partial(
-                watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
-            )
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
+                )
+            else:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
+                )

Review Comment:
   might be cleaner to move this if block to a private method and i think it can be simplified to this:
   ```python
       def _list_worker_pods(self, watcher: watch.Watch, kube_client, **kwargs):
           if self.namespace is None:
               return watcher.stream(kube_client.list_pod_for_all_namespaces, **kwargs)
           else:
               return watcher.stream(kube_client.list_namespaced_pod, self.namespace, **kwargs)
   ```
   
   then used like this
   ```python
           for event in self._list_worker_pods(watcher, kube_client, **kwargs):
   ```
   
   I think this makes sense because, this class doesn't really need to know about the airflow config.  It's just a watcher.  And, it's either watching a namespace, or watching all namespaces.  If namespace is none, it watches all; otherwise, it watches the namespace.



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers

Review Comment:
   this is somewhat redundant logic. maybe you could just calculate "namespaces" once (e.g. in the property i suggested adding to k8s exec) and then pass that along when creating this class.  
   
   then you could do something like 
   ```
       def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
           if not self.namespaces:
               return {None: self._make_kube_watcher(None)}
           return {x: self._make_kube_watcher(x) for x in self.namespaces}
   ```
   
   i don't feel great about having `None` as a key in the dict... it's not a super clear as a signaling mechanism. (and,  of course, it's not a namespace, though sometimes it appears that way in the code).
   
   maybe we'll think of something as we work through it.
   



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -497,16 +541,16 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
 
             # Try run_id first
             kwargs["label_selector"] += ",run_id=" + pod_generator.make_safe_label_value(ti.run_id)
-            pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
-            if pod_list.items:
+            pod_list = self._list_pods(kwargs)
+            if pod_list:

Review Comment:
   ```suggestion
               for _ in self._list_pods(kwargs):
   ```
   if you accept above suggestion, you'd have to do this



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -497,16 +541,16 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
 
             # Try run_id first
             kwargs["label_selector"] += ",run_id=" + pod_generator.make_safe_label_value(ti.run_id)
-            pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
-            if pod_list.items:
+            pod_list = self._list_pods(kwargs)
+            if pod_list:
                 continue
             # Fallback to old style of using execution_date
             kwargs["label_selector"] = (
                 f"{base_label_selector},"
                 f"execution_date={pod_generator.datetime_to_label_safe_datestring(ti.execution_date)}"
             )
-            pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
-            if pod_list.items:
+            pod_list = self._list_pods(kwargs)

Review Comment:
   and same here



##########
airflow/kubernetes/kube_config.py:
##########
@@ -57,6 +57,14 @@ def __init__(self):
         # create, watch, get, and delete pods in this namespace.
         self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
         self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
+        if self.multi_namespace_mode and conf.get(
+            self.kubernetes_section, "multi_namespace_mode_namespace_list"
+        ):
+            self.multi_namespace_mode_namespace_list = conf.get(
+                self.kubernetes_section, "multi_namespace_mode_namespace_list"
+            ).split(",")
+        else:
+            self.multi_namespace_mode_namespace_list = None

Review Comment:
   ```suggestion
           self.multi_namespace_mode_namespace_list = conf.getjson(
               self.kubernetes_section, "multi_namespace_mode_namespace_list"
           )
   ```
   maybe this would be a good use of json. or could add a `getlist`



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers
+
+    def _health_check_kube_watchers(self):
+        for namespace, kube_watcher in self.kube_watchers.items():
+            if kube_watcher.is_alive():
+                self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+            else:
+                self.log.error(
+                    (
+                        "Error while health checking kube watcher process for namespace %s. "
+                        "Process died for unknown reasons"
+                    ),
+                    namespace,
+                )
+                ResourceVersion().resource_version[namespace] = "0"
+                self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
 
     def run_next(self, next_job: KubernetesJobType) -> None:

Review Comment:
   in this method construct_pod is called with `self.namespace`. does this make sense with multi-namespace mode?  how are additional namespaces used if pods are always created with the one namespace?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1335992881

   Hi @dstandish , clarified/addressed your earlier comments. Pls help take another look when you get time? Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344334903

   And even if you do not have as big of a machine you can control number of parallel tests by ``--parallelism`` flag to stress your system even more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042917394


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -444,6 +471,23 @@ def __init__(self):
         self.kubernetes_queue: str | None = None
         super().__init__(parallelism=self.kube_config.parallelism)
 
+    def _list_pods(self, query_kwargs):
+        if self.kube_config.multi_namespace_mode:
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                pods = []
+                for namespace in self.kube_config.multi_namespace_mode_namespace_list:
+                    pods.extend(
+                        self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
+                    )
+            else:
+                pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+        else:
+            pods = self.kube_client.list_namespaced_pod(
+                namespace=self.kube_config.kube_namespace, **query_kwargs
+            ).items
+
+        return pods
+

Review Comment:
   sure thing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1341969878

   you may want to look at file task handler.  when reading logs from k8s task, seems to assume single namespace


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1038652486


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -66,7 +67,7 @@ class ResourceVersion:
     """Singleton for tracking resourceVersion from Kubernetes"""
 
     _instance = None
-    resource_version = "0"
+    resource_version: dict[str | None, str | None] = {}

Review Comment:
   Changed to `resource_version: dict[str | None, str] = {}` in commit
   https://github.com/apache/airflow/pull/28047/commits/a23c33e90bd6666286e03cc676aba9a6322d570c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344324755

   I think you should see at the memory usage while you are running it. For me it looks like some of the modifications there use accidentally vast amounts of memory @XD-DENG . That blows off our parallel running tests in ci (we have 64 GB of memory and tests are running in parallel,  so you do not have all 64GB available. 
   
   And if this is correct assesment, then this is likely not only a CI but "Real" issue that might impact our users, so we better diagnose it. 
   
   What I can do (and will now) is to assign "debug ci resources" label to this PR and rebase/re-run it. This one produces a nice output of resources that are being used while the test is running - so we will see what the memory usage is and be sure that this is the case (but 137 always so far indicated memory problems). 
   
   BTW. In the future as commiter you can also do that @XD-DENG  :D - simply assigning the "debug ci resources" label before PR build is run will turn the resource monitoring on. 
   
   Not sure what the reason for the alledged memory usage is but ;ater on, when this one gets fixed (hopefully?) you can also assign "use public runners"  labels and rebase/re-push the change and in this case it will run the same test on much more memory-constrained public runners that have far less memory (8GB) but tests there are run with far less parallelism so we can see if the change does not trigger a bit more "moderate" memory increase that might impact our non-commiter contributors. 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343131210

   Thanks @potiuk . Yes, I believe the same after double-checking the CI logs (especially the exit code 137).
   
   I'm trying to use Breeze to reproduce everything locally, but having problem using Breeze locally though, it's alway giving me error below:
   
   ```
   Docker filesystem: apfs
   Good version of Docker: 20.10.8.
   Good version of docker-compose: 1.29.2
   Good Docker context used: default.
   Removing network airflow-test-all_default
   Creating network "airflow-test-all_default" with the default driver
   Creating airflow-test-all_airflow_run ... done
   
   Running Initialization. Your basic configuration is:
   
     * Airflow home: /root/airflow
     * Airflow sources: /opt/airflow
     * Airflow core SQL connection: 
   
   
   Using airflow version from current sources
   
   
   Checking backend and integrations.
   
   
   
   Skip fixing ownership of generated files as Host OS is darwin
   
   
   Running tests tests
   
   
   Starting the tests with those pytest arguments: --verbosity=0 --strict-markers --durations=100 --maxfail=50 --color=yes --junitxml=/files/test_result-All-sqlite.xml --timeouts-order moi --setup-timeout=60 --execution-timeout=60 --teardown-timeout=60 --output=/files/warnings-All-sqlite.txt --disable-warnings -rfEX --with-db-init tests
   
   ERROR: usage: pytest [options] [file_or_dir] [file_or_dir] [...]
   pytest: error: unrecognized arguments: --output=/files/warnings-All-sqlite.txt
     inifile: /opt/airflow/pytest.ini
     rootdir: /opt/airflow
   
   ERROR: 4
   No stopped containers
   
   ```
   
   Do you have any suggestion for that?
   
   Meanwhile I will keep checking on my end. Thanks again
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1037857487


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -288,15 +294,33 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:

Review Comment:
   ```suggestion
       def _make_kube_watchers(self) -> dict[str, KubernetesJobWatcher]:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1040550344


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers

Review Comment:
   > "ALL_NAMESPACES" is not legal as K8s namespace, so we don't have to worry about name collision with a real namespace.
   
   Nice, that's great.  A little more descriptive, and very convenient that there's no risk of collision.  
   
   re the other stuff, no worries...  and... sorry if they are too "code golfy" ... just... was looking at how to maybe streamline some parts of it... just suggestions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1044096352


##########
airflow/config_templates/config.yml:
##########
@@ -2422,11 +2422,21 @@
     - name: multi_namespace_mode
       description: |
         Allows users to launch pods in multiple namespaces.
-        Will require creating a cluster-role for the scheduler
+        Will require creating a cluster-role for the scheduler,
+        or use multi_namespace_mode_namespace_list configuration.
       version_added: 1.10.12
       type: boolean
       example: ~
       default: "False"
+    - name: multi_namespace_mode_namespace_list
+      description: |
+        If multi_namespace_mode is True while scheduler does not have a cluster-role,
+        give the list of namespaces where the scheduler will schedule jobs
+        Scheduler needs to have the necessary permissions in these namespaces.
+      version_added: 2.5.1

Review Comment:
   Updated to 2.6.0 at https://github.com/apache/airflow/pull/28047/commits/974e34521e6d32db7f50ca810c1f602e918f8eec
   
   Continuing to check the CI failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344333515

   BTW. You can also locally do this:
   
   ```
   breeze testing tests --run-in-parallell
   ```
   
   This will do EXACTLY what is being done in the CI - it will run all the tests in parallel in the same way as in CI - if you have big enough local machine (> 32 GB for docker) 8 CPUS - it will run with the same number of parallel tests as the tests we run in CI, so if there is any side effect of those tests running in parallel (there should not be because each of them uses a separate `docker-compose` started - you will be able to see it there yourself (I.e. how memory usage grows when you run the tests in parallel).
   
   This should enable you to fully reproduce locally what you see in CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042990135


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -66,7 +68,7 @@ class ResourceVersion:
     """Singleton for tracking resourceVersion from Kubernetes."""
 
     _instance = None
-    resource_version = "0"
+    resource_version: dict[str, str] = {}

Review Comment:
   We can probably improve this class further, but that can be a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041863734


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -140,9 +141,14 @@ def _run(
 
         last_resource_version: str | None = None
         if self.multi_namespace_mode:
-            list_worker_pods = functools.partial(
-                watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
-            )
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
+                )
+            else:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
+                )

Review Comment:
   yeah, similar to below, i guess the idea behind the suggestion is that, code reuse isn't the only reason to extract logic into a method. sometimes it just makes the code easier to read... so you just see `for pod in list_pods()` rather than a bunch of logic in an if block.
   
   again, feel free to disregard, just suggestion
   
   side note though, i don't think there's any need to use functools.partial here, i don't think it is doing anything different from just calling stream() here, which returns a generator, and iterating that later.  so i think we can get rid of that (i know that's just how it was)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041770466


##########
airflow/kubernetes/kube_config.py:
##########
@@ -57,6 +57,14 @@ def __init__(self):
         # create, watch, get, and delete pods in this namespace.
         self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
         self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
+        if self.multi_namespace_mode and conf.get(
+            self.kubernetes_section, "multi_namespace_mode_namespace_list"
+        ):
+            self.multi_namespace_mode_namespace_list = conf.get(
+                self.kubernetes_section, "multi_namespace_mode_namespace_list"
+            ).split(",")
+        else:
+            self.multi_namespace_mode_namespace_list = None

Review Comment:
   sure



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041779481


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -497,16 +541,16 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
 
             # Try run_id first
             kwargs["label_selector"] += ",run_id=" + pod_generator.make_safe_label_value(ti.run_id)
-            pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
-            if pod_list.items:
+            pod_list = self._list_pods(kwargs)
+            if pod_list:
                 continue
             # Fallback to old style of using execution_date
             kwargs["label_selector"] = (
                 f"{base_label_selector},"
                 f"execution_date={pod_generator.datetime_to_label_safe_datestring(ti.execution_date)}"
             )
-            pod_list = self.kube_client.list_namespaced_pod(self.kube_config.kube_namespace, **kwargs)
-            if pod_list.items:
+            pod_list = self._list_pods(kwargs)

Review Comment:
   The lines being changes here seems outdated? They are no longer there in the latest version in this PR.
   
   Regarding the change itself, please refer to my question above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042918552


##########
tests/executors/test_kubernetes_executor.py:
##########
@@ -1108,9 +1209,9 @@ def effect():
             except Exception as e:
                 assert e.args == ("sentinel",)
 
-            # both  resource_version should be 0 after _run raises and exception
+            # both resource_version should be 0 after _run raises and exception

Review Comment:
   ```suggestion
               # both resource_version should be 0 after _run raises an exception
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1038613503


##########
airflow/config_templates/default_airflow.cfg:
##########
@@ -1197,9 +1197,14 @@ delete_worker_pods_on_failure = False
 worker_pods_creation_batch_size = 1
 
 # Allows users to launch pods in multiple namespaces.
-# Will require creating a cluster-role for the scheduler
+# Will require creating a cluster-role for the scheduler, or use namespace_list configuration.
 multi_namespace_mode = False
 
+# If multi_namespace_mode is True while scheduler does not have a cluster-role,
+# give the list of namespaces where the scheduler will schedule jobs
+# Scheduler needs to have the necessary permissions in these namespaces.
+namespace_list =

Review Comment:
   maybe this would be better.  
   
   ```suggestion
   multi_namespace_mode_namespace_list =
   ```
   it's really a sub-option of multi_namespace_mode.
   
   wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343186130

   Yep. Rebuild the image :)
   
   `breeze ci-image build`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042910236


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -444,6 +471,23 @@ def __init__(self):
         self.kubernetes_queue: str | None = None
         super().__init__(parallelism=self.kube_config.parallelism)
 
+    def _list_pods(self, query_kwargs):
+        if self.kube_config.multi_namespace_mode:
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                pods = []
+                for namespace in self.kube_config.multi_namespace_mode_namespace_list:
+                    pods.extend(
+                        self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
+                    )
+            else:
+                pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+        else:
+            pods = self.kube_client.list_namespaced_pod(
+                namespace=self.kube_config.kube_namespace, **query_kwargs
+            ).items
+
+        return pods
+

Review Comment:
   For this part, I personally find the current coding is easier to comprehend, and no disadvantage in terms performance. So I personally prefer not to make change in this part.
   
   Please let me know if you find it ok? Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041778739


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -444,6 +471,23 @@ def __init__(self):
         self.kubernetes_queue: str | None = None
         super().__init__(parallelism=self.kube_config.parallelism)
 
+    def _list_pods(self, query_kwargs):
+        if self.kube_config.multi_namespace_mode:
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                pods = []
+                for namespace in self.kube_config.multi_namespace_mode_namespace_list:
+                    pods.extend(
+                        self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
+                    )
+            else:
+                pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+        else:
+            pods = self.kube_client.list_namespaced_pod(
+                namespace=self.kube_config.kube_namespace, **query_kwargs
+            ).items
+
+        return pods
+

Review Comment:
   Sorry I don't see very obvious improvement/advantage from this change. May you clarify a bit more? Otherwise I believe the existing implementation is clearer IMHO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042901881


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers
+
+    def _health_check_kube_watchers(self):
+        for namespace, kube_watcher in self.kube_watchers.items():
+            if kube_watcher.is_alive():
+                self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+            else:
+                self.log.error(
+                    (
+                        "Error while health checking kube watcher process for namespace %s. "
+                        "Process died for unknown reasons"
+                    ),
+                    namespace,
+                )
+                ResourceVersion().resource_version[namespace] = "0"
+                self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
 
     def run_next(self, next_job: KubernetesJobType) -> None:

Review Comment:
   Our team are using KubernetesExecutor, and we do run it with with a multi namespace setting. But we couldn't use vanilla `multi_namespace_mode` as it is. That's exactly why I'm proposing the changes in this PR ;-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG merged pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG merged PR #28047:
URL: https://github.com/apache/airflow/pull/28047


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344350801

   Yep @XD-DENG - precisely as I expected:
   
   At some point in time those tests start eating memory at the right of > 1GB / 10 seconds. Just before the crash we have just 800 MB left, Something spins out of control and causes that - this only happens on your change, not all the other PRs or main -  so it **MUST** be something here that triggers it. Now we just need to find out what it might be:
   
   ![image](https://user-images.githubusercontent.com/595491/206719600-6f5bf3a7-0d4d-46c7-983b-a6b05aade407.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1038652343


##########
airflow/config_templates/default_airflow.cfg:
##########
@@ -1197,9 +1197,14 @@ delete_worker_pods_on_failure = False
 worker_pods_creation_batch_size = 1
 
 # Allows users to launch pods in multiple namespaces.
-# Will require creating a cluster-role for the scheduler
+# Will require creating a cluster-role for the scheduler, or use namespace_list configuration.
 multi_namespace_mode = False
 
+# If multi_namespace_mode is True while scheduler does not have a cluster-role,
+# give the list of namespaces where the scheduler will schedule jobs
+# Scheduler needs to have the necessary permissions in these namespaces.
+namespace_list =

Review Comment:
   Address with commit https://github.com/apache/airflow/pull/28047/commits/1c1f25d92e0faeb4f84069056dbdc24c1e53fa93



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1341972985

   > i just noticed something coincidentally... you may want to look at file task handler. when reading logs from k8s task, seems to assume single namespace
   
   Thanks @dstandish , yes, I have already noticed that earlier. I plan to address that separately later though if by then other folks haven't taken it up. That part was totally missed when this `multi_namespace_mode` was introduced (again, questioning how people are using this feature so far)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343123005

   This looks like it's coming from one of your changes @XD-DENG - it's hard to say what precisely it is - because one of your tests is likely exhausting all 64 GB of memory that the builders have (that's what error 137 is) and it happens very consistently on all your tests - and it does not seem to be happening on others. 
   
   You can easily reproduce those builds locally with Breeze and see what happens https://github.com/apache/airflow/blob/main/TESTING.rst#running-tests-using-breeze-from-the-host - seems like "Core" test type is failing. The main reason for having breeze is that you should be able to reproduce it locally.
   
   If not the memory exhaution you'd also get the instrunctions how to do it - but basically looks like some of the core tests  fail and you could easily attempt to reproduce it the hash is in the CI logs: 
   
   This will get you to the shell of the image that was used in the last CI build here:
   
   ```
   breeze shell --image-tag b64f96bc9e0cb67cd7b75baad3933638c23e4935 
   ```
   
   And you should be able to run the tests there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343141079

   and... maybe not but just a quick thought 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343173412

   > > maybe we don't need multiple watchers. maybe a single watcher can watch multiple namespaces.
   > 
   > I did think through of this idea earlier, but I couldn't think of a way to using existing K8S API to efficiently monitor multiple namespaces.
   
   Maybe with threads? just spitballing here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344546028

   Thanks a lot @potiuk  for helping check and sharing the Breeze/CI tips!
   I will do another check later today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041863734


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -140,9 +141,14 @@ def _run(
 
         last_resource_version: str | None = None
         if self.multi_namespace_mode:
-            list_worker_pods = functools.partial(
-                watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
-            )
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
+                )
+            else:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
+                )

Review Comment:
   yeah, similar to below, i guess the idea behind the suggestion is that, code reuse isn't the only reason to extract logic into a method. sometimes it just makes the code easier to read... so you just see `for pod in list_pods()` rather than a bunch of logic in an if block.
   
   again, feel free to disregard, just suggestion
   
   side note though, i don't think there's any need to use functools.partial here.  i think we can get rid of that (i know that's just how it was)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042902126


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers
+
+    def _health_check_kube_watchers(self):
+        for namespace, kube_watcher in self.kube_watchers.items():
+            if kube_watcher.is_alive():
+                self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+            else:
+                self.log.error(
+                    (
+                        "Error while health checking kube watcher process for namespace %s. "
+                        "Process died for unknown reasons"
+                    ),
+                    namespace,
+                )
+                ResourceVersion().resource_version[namespace] = "0"
+                self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
 
     def run_next(self, next_job: KubernetesJobType) -> None:

Review Comment:
   > It looks like maybe if you don't set namespace in airflow cfg then you may be able to set it with executor config
   
   And yes, I believe this is the answer to the original question you shared above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042932930


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -79,7 +81,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 
     def __init__(
         self,
-        namespace: str | None,
+        namespace: str,
         multi_namespace_mode: bool,

Review Comment:
   Nice catch! Address with https://github.com/apache/airflow/pull/28047/commits/742468ef37a0725d657ac4790f9c3daf22df9766



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1037854334


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -66,7 +67,7 @@ class ResourceVersion:
     """Singleton for tracking resourceVersion from Kubernetes"""
 
     _instance = None
-    resource_version = "0"
+    resource_version: dict[str | None, str | None] = {}

Review Comment:
   does this not work?
   ```suggestion
       resource_version: dict[str, str] = {}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042909354


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -140,9 +141,14 @@ def _run(
 
         last_resource_version: str | None = None
         if self.multi_namespace_mode:
-            list_worker_pods = functools.partial(
-                watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
-            )
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
+                )
+            else:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
+                )

Review Comment:
   Thanks again @dstandish . This part makes sense to me. I added commit https://github.com/apache/airflow/pull/28047/commits/9eaeede8dabc8f823d113ecb85112e43c31a2a8e to introduce the private method `_pod_events`, and get rid of `functool.partial()`. Let me know if you find it ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042925800


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -79,7 +81,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 
     def __init__(
         self,
-        namespace: str | None,
+        namespace: str,
         multi_namespace_mode: bool,

Review Comment:
   i think this can be removed now, along with its attr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1044092780


##########
airflow/config_templates/config.yml:
##########
@@ -2422,11 +2422,21 @@
     - name: multi_namespace_mode
       description: |
         Allows users to launch pods in multiple namespaces.
-        Will require creating a cluster-role for the scheduler
+        Will require creating a cluster-role for the scheduler,
+        or use multi_namespace_mode_namespace_list configuration.
       version_added: 1.10.12
       type: boolean
       example: ~
       default: "False"
+    - name: multi_namespace_mode_namespace_list
+      description: |
+        If multi_namespace_mode is True while scheduler does not have a cluster-role,
+        give the list of namespaces where the scheduler will schedule jobs
+        Scheduler needs to have the necessary permissions in these namespaces.
+      version_added: 2.5.1

Review Comment:
   i think it will need to go in 2.6 since new feature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1342948483

   Thanks both @dstandish  @uranusjr ! The whole PR has improved a lot through the review.
   
   May I also get some inputs from you about the failing test? The log isn't helping much.
   Is there any known issue of the test pipeline, or just something missed/wrong here? Thanks a lot


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343174039

   > > > maybe we don't need multiple watchers. maybe a single watcher can watch multiple namespaces.
   > > 
   > > 
   > > I did think through of this idea earlier, but I couldn't think of a way to using existing K8S API to efficiently monitor multiple namespaces.
   > 
   > Maybe with threads? just spitballing here
   
   or maybe we use multiple watchers, but run in thread instead of process


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344327042

   See for example in the "wait for CI images" job:
   
   ![image](https://user-images.githubusercontent.com/595491/206715855-88d4bb32-6a0a-4947-bc08-1c3208d103b1.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343915815

   Hi @potiuk , actually along preparing this PR I have been using Breeze to run the tests locally, especially the `Core` as it's what this PR mainly (or only) touches. It has been running successfully and pass locally. I have tried to run it locally using Breeze again, and it still can succeed (` breeze testing tests --test-types Core --run-in-parallel --parallelism 32`)
   
   <img width="1093" alt="Screenshot 2022-12-08 at 22 38 28" src="https://user-images.githubusercontent.com/11539188/206640682-bf79c97b-110b-4f04-b305-3b78edae943a.png">
   
   But it then always fail in the CI.
   
   Any further input you can share to debug on this? Especially if any way to locate which certain test class/unit is causing the failure. Thanks a lot!
   
   @dstandish @uranusjr  if you have any tips which may potentially help here, kindly let me know. Cheers!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list to avoid requiring cluster role

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1345190002

   Actually I think we need a bit more protection, otherwise the same situation happens if for any reason those asserts will start to raise exceptions. try/finally and making sure that we always .end() after we .start() acrosss all the k8s tests is a much more robust solution. 
   
   Follow up here https://github.com/apache/airflow/pull/28281 @XD-DENG 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1341978760

   > . That part was totally missed when this `multi_namespace_mode` was introduced (again, questioning how people are using this feature so far)
   
   sounds good


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344354789

   My guess is - this is problem with cleanup. You have a LOT of parameterized tests and they are not cleaned up after each test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041850043


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -444,6 +471,23 @@ def __init__(self):
         self.kubernetes_queue: str | None = None
         super().__init__(parallelism=self.kube_config.parallelism)
 
+    def _list_pods(self, query_kwargs):
+        if self.kube_config.multi_namespace_mode:
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                pods = []
+                for namespace in self.kube_config.multi_namespace_mode_namespace_list:
+                    pods.extend(
+                        self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
+                    )
+            else:
+                pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+        else:
+            pods = self.kube_client.list_namespaced_pod(
+                namespace=self.kube_config.kube_namespace, **query_kwargs
+            ).items
+
+        return pods
+

Review Comment:
   yeah, not really important sometimes i just think generators are a cleaner than managing the data structures
   
   if you want to be really picky you could say that, in the case where you are using a namespace list using the generator will sometimes a few API calls because if, say, you find the pod in the first namespace, you don't need to try the others.
   
   but i think mainly my motivation was just separation of the logic to determine namespaces from the logic to list the pods.
   
   but feel free to ignore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344336019

   More options available (such as ``-debug-resources` in `--help` output:
   
   ![image](https://user-images.githubusercontent.com/595491/206717640-58f49ae6-dc5b-4965-ba81-d5bcf2ac66cf.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1040497339


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers

Review Comment:
   Hi @dstandish , regarding the key, I added https://github.com/apache/airflow/pull/28047/commits/928b8399cad70c16c451096cf28a5222c00db084 . The key will be `ALL_NAMESPACES` when `namespace_list` is not given & `multi_namespace_mode=True`. `"ALL_NAMESPACES"` is not legal as K8s namespace, so we don't have to worry about name collision with a real namespace.
   
   Let me know how you find it, comparing with `None`?
   
   Regarding your other comments, will need more time to get back to you (actually haven't got time to read in details yet. Busy with day works). Appreciate the review and the suggestions!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1334704956

   Hi @potiuk and @ferruzzi tagging you both, to follow up [our earlier discussion in the mail list](https://lists.apache.org/thread/xxsppw7qwvky78l6nx41vlz593gj4zqb).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041740148


##########
airflow/kubernetes/kube_config.py:
##########
@@ -57,6 +57,14 @@ def __init__(self):
         # create, watch, get, and delete pods in this namespace.
         self.kube_namespace = conf.get(self.kubernetes_section, "namespace")
         self.multi_namespace_mode = conf.getboolean(self.kubernetes_section, "multi_namespace_mode")
+        if self.multi_namespace_mode and conf.get(
+            self.kubernetes_section, "multi_namespace_mode_namespace_list"
+        ):
+            self.multi_namespace_mode_namespace_list = conf.get(
+                self.kubernetes_section, "multi_namespace_mode_namespace_list"
+            ).split(",")
+        else:
+            self.multi_namespace_mode_namespace_list = None

Review Comment:
   Hi @dstandish , I'm not sure if this is the correct thing to do. 
   
   I think it's easier to allow users pass a value like `ns-a,ns-b,ns-c` to `multi_namespace_mode_namespace_list`, rather than asking them to put something like `"ns-a","ns-b","ns-c"`. So treating it as a json or list may not be making sense IMHO, and I would suggest we keep the existing way.
   
   Let me know how you find it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041781541


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers
+
+    def _health_check_kube_watchers(self):
+        for namespace, kube_watcher in self.kube_watchers.items():
+            if kube_watcher.is_alive():
+                self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+            else:
+                self.log.error(
+                    (
+                        "Error while health checking kube watcher process for namespace %s. "
+                        "Process died for unknown reasons"
+                    ),
+                    namespace,
+                )
+                ResourceVersion().resource_version[namespace] = "0"
+                self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
 
     def run_next(self, next_job: KubernetesJobType) -> None:

Review Comment:
   @dstandish  , that's a good question. 
   
   Actually along preparing this PR, I kept having this question: did this `multi_namespace_mode` feature ever really work for anyone? I also noticed a few similar issues, and I shared them at https://lists.apache.org/thread/nqnq2gbvtx6fyycsqhgjhlz1yotq0vhc



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers
+
+    def _health_check_kube_watchers(self):
+        for namespace, kube_watcher in self.kube_watchers.items():
+            if kube_watcher.is_alive():
+                self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+            else:
+                self.log.error(
+                    (
+                        "Error while health checking kube watcher process for namespace %s. "
+                        "Process died for unknown reasons"
+                    ),
+                    namespace,
+                )
+                ResourceVersion().resource_version[namespace] = "0"
+                self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
 
     def run_next(self, next_job: KubernetesJobType) -> None:

Review Comment:
   @dstandish  , that's a good question, and thanks for reminding on it.
   
   Actually along preparing this PR, I kept having this question: did this `multi_namespace_mode` feature ever really work for anyone? I also noticed a few similar issues, and I shared them at https://lists.apache.org/thread/nqnq2gbvtx6fyycsqhgjhlz1yotq0vhc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041850043


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -444,6 +471,23 @@ def __init__(self):
         self.kubernetes_queue: str | None = None
         super().__init__(parallelism=self.kube_config.parallelism)
 
+    def _list_pods(self, query_kwargs):
+        if self.kube_config.multi_namespace_mode:
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                pods = []
+                for namespace in self.kube_config.multi_namespace_mode_namespace_list:
+                    pods.extend(
+                        self.kube_client.list_namespaced_pod(namespace=namespace, **query_kwargs).items
+                    )
+            else:
+                pods = self.kube_client.list_pod_for_all_namespaces(**query_kwargs).items
+        else:
+            pods = self.kube_client.list_namespaced_pod(
+                namespace=self.kube_config.kube_namespace, **query_kwargs
+            ).items
+
+        return pods
+

Review Comment:
   yeah, not really important sometimes i just think generators are a cleaner than managing the data structures
   
   if you want to be really picky you could say that, in the case where you are using a namespace list using the generator will sometimes a few API calls because if, say, you find the pod in the first namespace, you don't need to try the others.
   
   but i think mainly my motivation was just separation of the logic to determine namespaces from the logic to list the pods, because smaller functions can be easier to understand.
   
   but feel free to ignore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1342006624

   there is nothing slow about it at all... lemme look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343135530

   @XD-DENG @potiuk  this gives an idea.  maybe we don't need multiple watchers.  maybe a single watcher can watch multiple namespaces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1341982373

   Hi @dstandish , I tried to address your earlier comments (very nice inputs!), either changes have been made or I have clarified why I would prefer to do differently. Please let me know your thoughts.
   
   (Sorry for getting back to you a bit slowly. Only have some time after work for this)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343141125

   > maybe we don't need multiple watchers. maybe a single watcher can watch multiple namespaces.
   
   I did think through of this idea earlier, but I couldn't think of a way to using existing K8S API to efficiently monitor multiple namespaces.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1038644918


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -66,7 +67,7 @@ class ResourceVersion:
     """Singleton for tracking resourceVersion from Kubernetes"""
 
     _instance = None
-    resource_version = "0"
+    resource_version: dict[str | None, str | None] = {}

Review Comment:
   Hi @dstandish , when `multi_namespace_mode` is `True` & `namespace_list` is not specified, that means the user will still want to apply the way using cluster-scope role. In this case, argument `namespace` will be None for the corresponding `KubernetesJobWatcher`. That's they the key can be `None`.
   
   But the value doesn't have to be `str | None`. So I will change this to `resource_version: dict[str | None, str] = {}`.



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -288,15 +294,33 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:

Review Comment:
   The same as clarified above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1345156469

   Thanks again @potiuk . The hint you pointed out was very useful. After adding https://github.com/apache/airflow/pull/28047/commits/cf8ce596e8fc82fec72b97860510fa4e908dd4d9, the CI is finally running successfully! (we just need to end the executor explicitly in the tests).
   
   There are three items we would like to further check later:
   - File Task Handler cannot read log properly from K8S tasks when it's `multi_namespace_mode` (it's designed in a way that it only handled one K8S namespace). It's a separate issue from this PR though.
   - @uranusjr  suggested to further refactor `ResourceVersion` class, but agreed it can be done later as a separate PR.
   - @dstandish suggested we can consider using threads to manage multiple watchers.
   
   But for now I believe we are good to go ahead and merge this PR itself.
   
   I would like to have @dstandish  and @potiuk  as co-authors for your significant contribution to this PR, if you have no objection :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1343175116

   i guess it depends what really is the memory overhead.  if it's not really that big maybe we think of it as CI issue... otherwise maybe best to find a way to  thread 🤷 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1344326257

   Added the "debug ci resources" label and re-run it. Let's see where it gets us. You will be able to see the regular dumps of memory int the task outputs while the tests are running (and if you run them locally you can also track it in similar way).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041753868


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -140,9 +141,14 @@ def _run(
 
         last_resource_version: str | None = None
         if self.multi_namespace_mode:
-            list_worker_pods = functools.partial(
-                watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
-            )
+            if self.kube_config.multi_namespace_mode_namespace_list:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_namespaced_pod, self.namespace, **kwargs
+                )
+            else:
+                list_worker_pods = functools.partial(
+                    watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
+                )

Review Comment:
   @dstandish  , Great point to skip checking the config and simplify the checks! I have added https://github.com/apache/airflow/pull/28047/commits/2aeb76939d1fdeecc04c1901d85244451de7d2fe to address this.
   
   But IMHO I'm nos sure if we should add one more private method for it, as we don't re-use it anywhere, and passing the args around doesn't seem clean (I did abstract for a similar case in the `KubernetesExecutor` code, the `_list_pods` method, as it's being re-used multiple times)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042905171


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers
+
+    def _health_check_kube_watchers(self):
+        for namespace, kube_watcher in self.kube_watchers.items():
+            if kube_watcher.is_alive():
+                self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+            else:
+                self.log.error(
+                    (
+                        "Error while health checking kube watcher process for namespace %s. "
+                        "Process died for unknown reasons"
+                    ),
+                    namespace,
+                )
+                ResourceVersion().resource_version[namespace] = "0"
+                self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
 
     def run_next(self, next_job: KubernetesJobType) -> None:

Review Comment:
   cool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: Ensure KubernetesExecutor's multi_namespace_mode work without cluster-scope role

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1038648026


##########
airflow/config_templates/default_airflow.cfg:
##########
@@ -1197,9 +1197,14 @@ delete_worker_pods_on_failure = False
 worker_pods_creation_batch_size = 1
 
 # Allows users to launch pods in multiple namespaces.
-# Will require creating a cluster-role for the scheduler
+# Will require creating a cluster-role for the scheduler, or use namespace_list configuration.
 multi_namespace_mode = False
 
+# If multi_namespace_mode is True while scheduler does not have a cluster-role,
+# give the list of namespaces where the scheduler will schedule jobs
+# Scheduler needs to have the necessary permissions in these namespaces.
+namespace_list =

Review Comment:
   Sure, that sounds sensible to me.
   
   Let me add a follow-up commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041206306


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers

Review Comment:
   They are definitely not "golfy" I believe! I do appreciate you taking time to help suggest on these details, and I find them extremely useful!
   
   Will get back to you here later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list to avoid requiring cluster role

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on PR #28047:
URL: https://github.com/apache/airflow/pull/28047#issuecomment-1352614715

   Hi @dstandish , for the File Task Handler issue we discussed earlier, I'm preparing the fix and should be ready within this week. FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1042932930


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -79,7 +81,7 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 
     def __init__(
         self,
-        namespace: str | None,
+        namespace: str,
         multi_namespace_mode: bool,

Review Comment:
   Nice catch! Addressed with https://github.com/apache/airflow/pull/28047/commits/742468ef37a0725d657ac4790f9c3daf22df9766



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] XD-DENG commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1044093803


##########
airflow/config_templates/config.yml:
##########
@@ -2422,11 +2422,21 @@
     - name: multi_namespace_mode
       description: |
         Allows users to launch pods in multiple namespaces.
-        Will require creating a cluster-role for the scheduler
+        Will require creating a cluster-role for the scheduler,
+        or use multi_namespace_mode_namespace_list configuration.
       version_added: 1.10.12
       type: boolean
       example: ~
       default: "False"
+    - name: multi_namespace_mode_namespace_list
+      description: |
+        If multi_namespace_mode is True while scheduler does not have a cluster-role,
+        give the list of namespaces where the scheduler will schedule jobs
+        Scheduler needs to have the necessary permissions in these namespaces.
+      version_added: 2.5.1

Review Comment:
   Yep I agree. Will update to 2.6.0
   
   (I just found this was missed to be updated so updating it to a "random" value and see if there is any luck it's relevant the weird CI errors. Obviously no luck)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #28047: KubernetesExecutor multi_namespace_mode can use namespace list

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #28047:
URL: https://github.com/apache/airflow/pull/28047#discussion_r1041841300


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -287,15 +293,35 @@ def _make_kube_watcher(self) -> KubernetesJobWatcher:
         watcher.start()
         return watcher
 
-    def _health_check_kube_watcher(self):
-        if self.kube_watcher.is_alive():
-            self.log.debug("KubeJobWatcher alive, continuing")
-        else:
-            self.log.error(
-                "Error while health checking kube watcher process. Process died for unknown reasons"
+    def _make_kube_watchers(self) -> dict[str | None, KubernetesJobWatcher]:
+        watchers = {}
+        if self.kube_config.multi_namespace_mode:
+            namespaces_to_watch = (
+                self.kube_config.multi_namespace_mode_namespace_list
+                if self.kube_config.multi_namespace_mode_namespace_list
+                else [None]
             )
-            ResourceVersion().resource_version = "0"
-            self.kube_watcher = self._make_kube_watcher()
+        else:
+            namespaces_to_watch = [self.kube_config.kube_namespace]
+
+        for namespace in namespaces_to_watch:
+            watchers[namespace] = self._make_kube_watcher(namespace)
+        return watchers
+
+    def _health_check_kube_watchers(self):
+        for namespace, kube_watcher in self.kube_watchers.items():
+            if kube_watcher.is_alive():
+                self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace)
+            else:
+                self.log.error(
+                    (
+                        "Error while health checking kube watcher process for namespace %s. "
+                        "Process died for unknown reasons"
+                    ),
+                    namespace,
+                )
+                ResourceVersion().resource_version[namespace] = "0"
+                self.kube_watchers[namespace] = self._make_kube_watcher(namespace)
 
     def run_next(self, next_job: KubernetesJobType) -> None:

Review Comment:
   It looks like maybe if you don't set `namespace` in airflow cfg then you may be able to set it with executor config
   
   i don't know more than that
   
   are you using it now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org