You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/22 00:51:08 UTC

[GitHub] [airflow] dstandish opened a new pull request, #20578: Use KubernetesHook to create api client in KubernetesPodOperator

dstandish opened a new pull request, #20578:
URL: https://github.com/apache/airflow/pull/20578

   What I've done here is entirely defer to the KubernetesHook with regard to using airflow conn id vs the three other parameters.
   
   I _think_ this should work. 
   
   But one thing I wonder about are these airflow conf settings in `get_kube_client`:
   
   ```python
       if not in_cluster:
           if cluster_context is None:
               cluster_context = conf.get('kubernetes', 'cluster_context', fallback=None)
           if config_file is None:
               config_file = conf.get('kubernetes', 'config_file', fallback=None)
   
       if conf.getboolean('kubernetes', 'enable_tcp_keepalive'):
           _enable_tcp_keepalive()
   
       if not conf.getboolean('kubernetes', 'verify_ssl'):
           _disable_verify_ssl()
   
       client_conf = _get_kube_config(in_cluster, cluster_context, config_file)
       return _get_client_with_patched_configuration(client_conf)
   ```
   
   It doesn't seem that KubernetesHook lookss at these settings.  Do you think we need to implement either / or logic here?  That is, only use K8s hook if an airflow conn id is provided? And otherwise, continue to use `get_kube_client`?
   
   Thanks
   
   cc @jedcunningham


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #20578:
URL: https://github.com/apache/airflow/pull/20578#issuecomment-1128990997

   Alright we now have the @potiuk's blessing for this one.  So just need some reviews.  @ashb discussed this one with you a while back.  @raphaelauv I know you are interested in this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #20578:
URL: https://github.com/apache/airflow/pull/20578#discussion_r878546477


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -561,6 +569,39 @@ def dry_run(self) -> None:
         pod = self.build_pod_request_obj()
         print(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))
 
+    def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
+        """
+        Here we read config from core Airflow config [kubernetes] section.
+        In a future release we will stop looking at this section and require users
+        to use Airflow connections to configure KPO.
+
+        When we find values there that we need to apply on the hook, we patch special
+        hook attributes here.
+        """
+
+        # default for enable_tcp_keepalive is True; patch if False
+        if conf.getboolean('kubernetes', 'enable_tcp_keepalive', fallback=None) is False:

Review Comment:
   For those following along at home, _some_ of these fallbacks had no effect (like this one) as conf falls back to the defaults first. However, some of the others have no default ever, and are necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #20578:
URL: https://github.com/apache/airflow/pull/20578#discussion_r878536394


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -561,6 +569,39 @@ def dry_run(self) -> None:
         pod = self.build_pod_request_obj()
         print(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))
 
+    def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
+        """
+        Here we read config from core Airflow config [kubernetes] section.
+        In a future release we will stop looking at this section and require users
+        to use Airflow connections to configure KPO.
+
+        When we find values there that we need to apply on the hook, we patch special
+        hook attributes here.
+        """
+
+        # default for enable_tcp_keepalive is True; patch if False
+        if conf.getboolean('kubernetes', 'enable_tcp_keepalive', fallback=None) is False:

Review Comment:
   if the value is missing from the config and you don't have a fallback, it would throw an error.
   as to why None, well, just a reasonable default that means "not set".  Only when it's explicitly set to False would we want to set the hook to disable TCP keepalive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on PR #20578:
URL: https://github.com/apache/airflow/pull/20578#issuecomment-1129105596

   > could we rename `config_file` -> `config_file_path` in the kubernetes hook ?
   > 
   > cause currently I find very disturbing the variable name : `config_file`
   
   oy, yeah, you are right, could be named better.  i think i'm the one that added it a couple months back as prep work for this change, and i just kept the same name used in [KPO](https://github.com/apache/airflow/blob/fe6ec34f0d0e7d47d58913a71777fbfb6239b634/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py#L186).  
   
   But honestly at this point I don't think I'm in favor of changing it.  There's already been so much tumult and change with K8s /  KPO, and really it's not _that_ bad is it?  The risk is what, that a user would think it accepts a file handler?  It's annotated with Optional[str] (though maybe we should extend that to accept `Path`).  And the docstring should also clarify (though i guess i need to add that!)
   
   In any case, if we _do_ want to modify it, I think it belongs in a distinct PR for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #20578:
URL: https://github.com/apache/airflow/pull/20578#issuecomment-1131708703

   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] raphaelauv commented on pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
raphaelauv commented on PR #20578:
URL: https://github.com/apache/airflow/pull/20578#issuecomment-1129003582

   could werename `config_file` -> `config_file_path` in the futur hook ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish commented on a diff in pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
dstandish commented on code in PR #20578:
URL: https://github.com/apache/airflow/pull/20578#discussion_r878536960


##########
tests/providers/cncf/kubernetes/hooks/test_kubernetes.py:
##########
@@ -131,6 +136,76 @@ def test_get_default_client(
             mock_loader.assert_not_called()
         assert isinstance(api_conn, kubernetes.client.api_client.ApiClient)
 
+    @pytest.mark.parametrize(
+        'disable_verify_ssl, conn_id, disable_called',
+        (
+            (True, None, True),
+            (None, None, False),
+            (False, None, False),
+            (None, 'disable_verify_ssl', True),
+            (True, 'disable_verify_ssl', True),
+            (False, 'disable_verify_ssl', False),
+            (None, 'disable_verify_ssl_empty', False),
+            (True, 'disable_verify_ssl_empty', True),
+            (False, 'disable_verify_ssl_empty', False),
+        ),
+    )
+    @patch("kubernetes.config.incluster_config.InClusterConfigLoader", new=MagicMock())
+    @patch(f"{HOOK_MODULE}._disable_verify_ssl")
+    def test_disable_verify_ssl(
+        self,
+        mock_disable,
+        disable_verify_ssl,
+        conn_id,
+        disable_called,
+    ):
+        """
+        Verifies whether disable verify ssl is called depending on combination of hook param and
+        connection extra. Hook param should beat extra.
+        """
+        kubernetes_hook = KubernetesHook(conn_id=conn_id, disable_verify_ssl=disable_verify_ssl)
+        api_conn = kubernetes_hook.get_conn()
+        if disable_called:
+            assert mock_disable.called
+        else:
+            assert not mock_disable.called

Review Comment:
   🤯 mind blown



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #20578:
URL: https://github.com/apache/airflow/pull/20578#discussion_r878521528


##########
airflow/providers/cncf/kubernetes/CHANGELOG.rst:
##########
@@ -19,6 +19,19 @@
 Changelog
 ---------
 
+main
+....
+
+Features
+~~~~~~~~
+
+KubernetesPodOperator now uses KubernetesHook
+`````````````````````````````````````````````
+
+Previously, KubernetesPodOperator relied on core Airflow configuration (namely setting for kubernetes executor) for certain settings used in client generation.  Now KubernetesPodOperator uses KubernetesHook, and the consideration of core k8s settings is officially deprecated.
+
+If you are using the Airflow configuration settings (e.g. as opposed to operator params) to configure the kubernetes client, then prior to the next major release you will need to add an Airflow connection and set your KPO processes to use that connection.

Review Comment:
   ```suggestion
   If you are using the Airflow configuration settings (e.g. as opposed to operator params) to configure the kubernetes client, then prior to the next major release you will need to add an Airflow connection and set your KPO tasks to use that connection.
   ```
   
   nit



##########
kubernetes_tests/test_kubernetes_pod_operator.py:
##########
@@ -582,10 +586,12 @@ def test_xcom_push(self, xcom_push):
         self.expected_pod['spec']['containers'].append(container)
         assert self.expected_pod == actual_pod
 
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod")
-    @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_envs_from_secrets(self, mock_client, await_pod_completion_mock, create_pod):
+    @mock.patch(f"{POD_MANAGER_CLASS}.create_pod")

Review Comment:
   👍



##########
tests/providers/cncf/kubernetes/hooks/test_kubernetes.py:
##########
@@ -131,6 +136,76 @@ def test_get_default_client(
             mock_loader.assert_not_called()
         assert isinstance(api_conn, kubernetes.client.api_client.ApiClient)
 
+    @pytest.mark.parametrize(
+        'disable_verify_ssl, conn_id, disable_called',
+        (
+            (True, None, True),
+            (None, None, False),
+            (False, None, False),
+            (None, 'disable_verify_ssl', True),
+            (True, 'disable_verify_ssl', True),
+            (False, 'disable_verify_ssl', False),
+            (None, 'disable_verify_ssl_empty', False),
+            (True, 'disable_verify_ssl_empty', True),
+            (False, 'disable_verify_ssl_empty', False),
+        ),
+    )
+    @patch("kubernetes.config.incluster_config.InClusterConfigLoader", new=MagicMock())
+    @patch(f"{HOOK_MODULE}._disable_verify_ssl")
+    def test_disable_verify_ssl(
+        self,
+        mock_disable,
+        disable_verify_ssl,
+        conn_id,
+        disable_called,
+    ):
+        """
+        Verifies whether disable verify ssl is called depending on combination of hook param and
+        connection extra. Hook param should beat extra.
+        """
+        kubernetes_hook = KubernetesHook(conn_id=conn_id, disable_verify_ssl=disable_verify_ssl)
+        api_conn = kubernetes_hook.get_conn()
+        if disable_called:
+            assert mock_disable.called
+        else:
+            assert not mock_disable.called

Review Comment:
   ```suggestion
           assert mock_disable.called is disabled_called
   ```
   
   nit



##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -561,6 +569,39 @@ def dry_run(self) -> None:
         pod = self.build_pod_request_obj()
         print(yaml.dump(prune_dict(pod.to_dict(), mode='strict')))
 
+    def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
+        """
+        Here we read config from core Airflow config [kubernetes] section.
+        In a future release we will stop looking at this section and require users
+        to use Airflow connections to configure KPO.
+
+        When we find values there that we need to apply on the hook, we patch special
+        hook attributes here.
+        """
+
+        # default for enable_tcp_keepalive is True; patch if False
+        if conf.getboolean('kubernetes', 'enable_tcp_keepalive', fallback=None) is False:

Review Comment:
   I'm curious, why do we need `fallback=None`?



##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -108,37 +121,77 @@ def __init__(
         cluster_context: Optional[str] = None,
         config_file: Optional[str] = None,
         in_cluster: Optional[bool] = None,
+        disable_verify_ssl: Optional[bool] = None,
+        disable_tcp_keepalive: Optional[bool] = None,
     ) -> None:
         super().__init__()
         self.conn_id = conn_id
         self.client_configuration = client_configuration
         self.cluster_context = cluster_context
         self.config_file = config_file
         self.in_cluster = in_cluster
+        self.disable_verify_ssl = disable_verify_ssl
+        self.disable_tcp_keepalive = disable_tcp_keepalive
+
+        # these params used for transition in KPO to K8s hook
+        # for a deprecation period we will continue to consider k8s settings from airflow.cfg
+        self._deprecated_core_disable_tcp_keepalive: Optional[bool] = None
+        self._deprecated_core_disable_verify_ssl: Optional[bool] = None
+        self._deprecated_core_in_cluster: Optional[bool] = None
+        self._deprecated_core_cluster_context: Optional[str] = None
+        self._deprecated_core_config_file: Optional[str] = None
 
     @staticmethod
     def _coalesce_param(*params):
         for param in params:
             if param is not None:
                 return param
 
-    def get_conn(self) -> Any:
-        """Returns kubernetes api session for use with requests"""
+    @cached_property
+    def conn_extras(self):
         if self.conn_id:
             connection = self.get_connection(self.conn_id)
             extras = connection.extra_dejson
         else:
             extras = {}
+        return extras
+
+    def _get_field(self, field_name):
+        if field_name.startswith('extra_'):
+            raise ValueError(
+                f"Got prefixed name {field_name}; please remove the 'extra__kubernetes__' prefix "
+                f"when using this method."
+            )
+        if field_name in self.conn_extras:
+            return self.conn_extras[field_name] or None
+        prefixed_name = f"extra__kubernetes__{field_name}"
+        return self.conn_extras.get(prefixed_name) or None
+
+    @staticmethod
+    def _deprecation_warning_core_param(deprecation_warnings):
+        settings_list_str = ''.join([f"\n\t{k}={v!r}" for k, v in deprecation_warnings])
+        warnings.warn(
+            f"\nApplying core Airflow settings from section [kubernetes] with the following keys:"
+            f"{settings_list_str}\n"
+            "In a future release, KubernetesPodOperator will no longer consider core\n"
+            "airflow settings; define an Airflow connection instead.",

Review Comment:
   ```suggestion
               "Airflow settings; define an Airflow connection instead.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #20578:
URL: https://github.com/apache/airflow/pull/20578#discussion_r878607885


##########
tests/providers/cncf/kubernetes/hooks/test_kubernetes.py:
##########
@@ -131,6 +136,73 @@ def test_get_default_client(
             mock_loader.assert_not_called()
         assert isinstance(api_conn, kubernetes.client.api_client.ApiClient)
 
+    @pytest.mark.parametrize(
+        'disable_verify_ssl, conn_id, disable_called',
+        (
+            (True, None, True),
+            (None, None, False),
+            (False, None, False),
+            (None, 'disable_verify_ssl', True),
+            (True, 'disable_verify_ssl', True),
+            (False, 'disable_verify_ssl', False),
+            (None, 'disable_verify_ssl_empty', False),
+            (True, 'disable_verify_ssl_empty', True),
+            (False, 'disable_verify_ssl_empty', False),
+        ),
+    )
+    @patch("kubernetes.config.incluster_config.InClusterConfigLoader", new=MagicMock())
+    @patch(f"{HOOK_MODULE}._disable_verify_ssl")
+    def test_disable_verify_ssl(
+        self,
+        mock_disable,
+        disable_verify_ssl,
+        conn_id,
+        disable_called,
+    ):
+        """
+        Verifies whether disable verify ssl is called depending on combination of hook param and
+        connection extra. Hook param should beat extra.
+        """
+        kubernetes_hook = KubernetesHook(conn_id=conn_id, disable_verify_ssl=disable_verify_ssl)
+        api_conn = kubernetes_hook.get_conn()
+        assert mock_disable.called is disabled_called

Review Comment:
   ```suggestion
           assert mock_disable.called is disable_called
   ```
   
   Oops! Sorry for my typo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dstandish merged pull request #20578: Use KubernetesHook to create api client in KubernetesPodOperator

Posted by GitBox <gi...@apache.org>.
dstandish merged PR #20578:
URL: https://github.com/apache/airflow/pull/20578


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org