You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/21 02:12:51 UTC

[GitHub] [airflow] xinbinhuang commented on a diff in pull request #28512: [Fixed] "Adding Flink on K8s Operator"

xinbinhuang commented on code in PR #28512:
URL: https://github.com/apache/airflow/pull/28512#discussion_r1053911449


##########
airflow/providers/apache/flink/operators/flink_kubernetes.py:
##########
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Sequence
+
+from kubernetes.client import CoreV1Api
+
+from airflow.compat.functools import cached_property
+from airflow.configuration import conf
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class FlinkKubernetesOperator(BaseOperator):
+    """
+    Creates flinkDeployment object in kubernetes cluster:
+
+    .. seealso::
+        For more detail about Flink Deployment Object have a look at the reference:
+        https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#flinkdeployment
+
+    :param application_file: Defines Kubernetes 'custom_resource_definition' of 'flinkDeployment' as either a
+        path to a '.yaml' file, '.json' file, YAML string or JSON string.
+    :param namespace: kubernetes namespace to put flinkDeployment
+    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
+        for the to Kubernetes cluster.
+    :param api_group: kubernetes api group of flinkDeployment
+    :param api_version: kubernetes api version of flinkDeployment
+    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param cluster_context: context that points to kubernetes cluster.
+        Ignored when in_cluster is True. If None, current-context is used.
+    :param config_file: The path to the Kubernetes config file. (templated)
+        If not specified, default value is ``~/.kube/config``
+    """
+
+    template_fields: Sequence[str] = ("application_file", "namespace")
+    template_ext: Sequence[str] = (".yaml", ".yml", ".json")
+    ui_color = "#f4a460"
+
+    def __init__(
+        self,
+        *,
+        application_file: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str = "kubernetes_default",
+        api_group: str = "flink.apache.org",
+        api_version: str = "v1beta1",
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,
+        plural: str = "flinkdeployments",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.application_file = application_file
+        self.namespace = namespace
+        self.kubernetes_conn_id = kubernetes_conn_id
+        self.api_group = api_group
+        self.api_version = api_version
+        self.plural = plural
+        self.in_cluster = in_cluster
+        self.cluster_context = cluster_context
+        self.config_file = config_file
+
+    @cached_property
+    def hook(self) -> KubernetesHook:
+        hook = KubernetesHook(
+            conn_id=self.kubernetes_conn_id,
+            in_cluster=self.in_cluster,
+            config_file=self.config_file,
+            cluster_context=self.cluster_context,
+        )
+        self._patch_deprecated_k8s_settings(hook)
+        return hook
+
+    def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
+        """
+        Here we read config from core Airflow config [kubernetes] section.
+        In a future release we will stop looking at this section and require users
+        to use Airflow connections to configure KPO.
+
+        When we find values there that we need to apply on the hook, we patch special
+        hook attributes here.
+        """
+        # default for enable_tcp_keepalive is True; patch if False
+        if conf.getboolean("kubernetes", "enable_tcp_keepalive") is False:
+            hook._deprecated_core_disable_tcp_keepalive = True  # noqa
+
+        # default verify_ssl is True; patch if False.
+        if conf.getboolean("kubernetes", "verify_ssl") is False:
+            hook._deprecated_core_disable_verify_ssl = True  # noqa
+
+        # default for in_cluster is True; patch if False and no KPO param.
+        conf_in_cluster = conf.getboolean("kubernetes", "in_cluster")
+        if self.in_cluster is None and conf_in_cluster is False:
+            hook._deprecated_core_in_cluster = conf_in_cluster  # noqa
+
+        # there's no default for cluster context; if we get something (and no KPO param) patch it.
+        conf_cluster_context = conf.get("kubernetes", "cluster_context", fallback=None)
+        if not self.cluster_context and conf_cluster_context:
+            hook._deprecated_core_cluster_context = conf_cluster_context  # noqa
+

Review Comment:
   @chethanuk-plutoflume Are these actually needed ? Not see them being used/referenced anywhere?



##########
airflow/providers/apache/flink/operators/flink_kubernetes.py:
##########
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Sequence
+
+from kubernetes.client import CoreV1Api
+
+from airflow.compat.functools import cached_property
+from airflow.configuration import conf
+from airflow.models import BaseOperator
+from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class FlinkKubernetesOperator(BaseOperator):
+    """
+    Creates flinkDeployment object in kubernetes cluster:
+
+    .. seealso::
+        For more detail about Flink Deployment Object have a look at the reference:
+        https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/reference/#flinkdeployment
+
+    :param application_file: Defines Kubernetes 'custom_resource_definition' of 'flinkDeployment' as either a
+        path to a '.yaml' file, '.json' file, YAML string or JSON string.
+    :param namespace: kubernetes namespace to put flinkDeployment
+    :param kubernetes_conn_id: The :ref:`kubernetes connection id <howto/connection:kubernetes>`
+        for the to Kubernetes cluster.
+    :param api_group: kubernetes api group of flinkDeployment
+    :param api_version: kubernetes api version of flinkDeployment
+    :param in_cluster: run kubernetes client with in_cluster configuration.
+    :param cluster_context: context that points to kubernetes cluster.
+        Ignored when in_cluster is True. If None, current-context is used.
+    :param config_file: The path to the Kubernetes config file. (templated)
+        If not specified, default value is ``~/.kube/config``
+    """
+
+    template_fields: Sequence[str] = ("application_file", "namespace")
+    template_ext: Sequence[str] = (".yaml", ".yml", ".json")
+    ui_color = "#f4a460"
+
+    def __init__(
+        self,
+        *,
+        application_file: str,
+        namespace: str | None = None,
+        kubernetes_conn_id: str = "kubernetes_default",
+        api_group: str = "flink.apache.org",
+        api_version: str = "v1beta1",
+        in_cluster: bool | None = None,
+        cluster_context: str | None = None,
+        config_file: str | None = None,
+        plural: str = "flinkdeployments",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.application_file = application_file
+        self.namespace = namespace
+        self.kubernetes_conn_id = kubernetes_conn_id
+        self.api_group = api_group
+        self.api_version = api_version
+        self.plural = plural
+        self.in_cluster = in_cluster
+        self.cluster_context = cluster_context
+        self.config_file = config_file
+
+    @cached_property
+    def hook(self) -> KubernetesHook:
+        hook = KubernetesHook(
+            conn_id=self.kubernetes_conn_id,
+            in_cluster=self.in_cluster,
+            config_file=self.config_file,
+            cluster_context=self.cluster_context,
+        )
+        self._patch_deprecated_k8s_settings(hook)
+        return hook
+
+    def _patch_deprecated_k8s_settings(self, hook: KubernetesHook):
+        """
+        Here we read config from core Airflow config [kubernetes] section.
+        In a future release we will stop looking at this section and require users
+        to use Airflow connections to configure KPO.
+
+        When we find values there that we need to apply on the hook, we patch special
+        hook attributes here.
+        """
+        # default for enable_tcp_keepalive is True; patch if False
+        if conf.getboolean("kubernetes", "enable_tcp_keepalive") is False:
+            hook._deprecated_core_disable_tcp_keepalive = True  # noqa
+
+        # default verify_ssl is True; patch if False.
+        if conf.getboolean("kubernetes", "verify_ssl") is False:
+            hook._deprecated_core_disable_verify_ssl = True  # noqa
+
+        # default for in_cluster is True; patch if False and no KPO param.
+        conf_in_cluster = conf.getboolean("kubernetes", "in_cluster")
+        if self.in_cluster is None and conf_in_cluster is False:
+            hook._deprecated_core_in_cluster = conf_in_cluster  # noqa
+
+        # there's no default for cluster context; if we get something (and no KPO param) patch it.
+        conf_cluster_context = conf.get("kubernetes", "cluster_context", fallback=None)
+        if not self.cluster_context and conf_cluster_context:
+            hook._deprecated_core_cluster_context = conf_cluster_context  # noqa
+

Review Comment:
   @chethanuk-plutoflume Are these actually needed ? Not see them being used/referenced anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org