You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/21 17:31:20 UTC

[GitHub] [airflow] VladaZakharova commented on a diff in pull request #28523: Add `KubernetesPodOperatorAsync`

VladaZakharova commented on code in PR #28523:
URL: https://github.com/apache/airflow/pull/28523#discussion_r1054637494


##########
tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py:
##########
@@ -26,19 +26,26 @@
 from kubernetes.client import ApiClient, models as k8s
 from pytest import param
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.kubernetes.secret import Secret
 from airflow.models import DAG, DagModel, DagRun, TaskInstance
 from airflow.models.xcom import XCom
 from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
+    KubernetesPodAsyncOperator,
     KubernetesPodOperator,
+    PodNotFoundException,
     _optionally_suppress,
 )
+from airflow.providers.cncf.kubernetes.triggers.wait_container import PodLaunchTimeoutException
+from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLoggingStatus
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.types import DagRunType
+
+# from tests.providers.cncf.kubernetes.utils.airflow_util import create_context

Review Comment:
   Do you actually need this import here?



##########
tests/providers/cncf/kubernetes/triggers/test_wait_container.py:
##########
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file

Review Comment:
   May be it is better to name this test file as test_kubernetes_pod.py to follow naming convention in Airflow?



##########
airflow/providers/cncf/kubernetes/triggers/wait_container.py:
##########
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   May be it is better to name this file as kubernetes_pod.py to follow naming convention in Airflow?



##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -406,3 +408,102 @@ def _get_bool(val) -> bool | None:
         elif val.strip().lower() == "false":
             return False
     return None
+
+
+class KubernetesAsyncHook(KubernetesHook):
+    """
+    Creates Async Kubernetes API connection.
+
+    - use in cluster configuration by using extra field ``in_cluster`` in connection
+    - use custom config by providing path to the file using extra field ``kube_config_path`` in connection
+    - use custom configuration by providing content of kubeconfig file via
+        extra field ``kube_config`` in connection
+    - use default config by providing no extras
+
+    This hook check for configuration option in the above order. Once an option is present it will
+    use this configuration.
+
+    .. seealso::
+        For more information about Kubernetes connection:
+        :doc:`/connections/kubernetes`
+
+    :param conn_id: The :ref:`kubernetes connection <howto/connection:kubernetes>`
+        to Kubernetes cluster.
+    :param client_configuration: Optional dictionary of client configuration params.
+        Passed on to kubernetes client.
+    :param cluster_context: Optionally specify a context to use (e.g. if you have multiple
+        in your kubeconfig.
+    :param config_file: Path to kubeconfig file.
+    :param in_cluster: Set to ``True`` if running from within a kubernetes cluster.
+    :param disable_verify_ssl: Set to ``True`` if SSL verification should be disabled.
+    :param disable_tcp_keepalive: Set to ``True`` if you want to disable keepalive logic.
+    """
+
+    async def _load_config(self) -> client.ApiClient:
+        """
+        Load config to interact with Kubernetes
+
+        cluster_context: Optional[str] = None,
+        config_file: Optional[str] = None,
+        in_cluster: Optional[bool] = None,
+
+        """
+        if self.conn_id:
+            connection = await sync_to_async(self.get_connection)(self.conn_id)
+            extras = connection.extra_dejson
+        else:
+            extras = {}
+        in_cluster = self._coalesce_param(self.in_cluster, extras.get("extra__kubernetes__in_cluster"))
+        cluster_context = self._coalesce_param(
+            self.cluster_context, extras.get("extra__kubernetes__cluster_context")
+        )
+        kubeconfig_path = self._coalesce_param(
+            self.config_file, extras.get("extra__kubernetes__kube_config_path")
+        )
+        kubeconfig = extras.get("extra__kubernetes__kube_config") or None
+        num_selected_configuration = len([o for o in [in_cluster, kubeconfig, kubeconfig_path] if o])
+
+        if num_selected_configuration > 1:
+            raise AirflowException(
+                "Invalid connection configuration. Options kube_config_path, "
+                "kube_config, in_cluster are mutually exclusive. "
+                "You can only use one option at a time."
+            )
+        if in_cluster:
+            self.log.debug("loading kube_config from: in_cluster configuration")

Review Comment:
   May be place more descriptive comment here to describe what does actually in_cluster parameter mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org