You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:12 UTC

[flink-kubernetes-operator] 19/23: moving to single ingress

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit b84af2592d7f9c61ccee33747d8550f1f118f493
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Feb 10 15:35:47 2022 +0100

    moving to single ingress
---
 examples/basic-ingress.yaml                        |   2 +-
 .../controller/FlinkDeploymentController.java      |  16 ++-
 .../operator/reconciler/JobReconciler.java         |   8 +-
 .../operator/reconciler/SessionReconciler.java     |   8 +-
 .../kubernetes/operator/utils/IngressUtils.java    | 112 ++++++++++++++++++++
 .../kubernetes/operator/utils/KubernetesUtils.java | 114 ---------------------
 .../{values.yaml => templates/ingress.yaml}        |  44 +++-----
 helm/flink-operator/templates/rbac.yaml            |   1 +
 helm/flink-operator/values.yaml                    |   3 +
 9 files changed, 154 insertions(+), 154 deletions(-)

diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index c0ad3a2..d846766 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -20,7 +20,7 @@ apiVersion: flink.apache.org/v1alpha1
 kind: FlinkDeployment
 metadata:
   namespace: default
-  name: basic-example
+  name: basic-ingress
 spec:
   image: flink:1.14.3
   flinkVersion: 1.14.3
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 5ab08e5..09eb536 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
 import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
 import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -77,6 +78,12 @@ public class FlinkDeploymentController
     public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
         LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
         FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
+        IngressUtils.updateIngressRules(
+                flinkApp,
+                FlinkUtils.getEffectiveConfig(flinkApp),
+                operatorNamespace,
+                kubernetesClient,
+                true);
         return DeleteControl.defaultDelete();
     }
 
@@ -89,7 +96,7 @@ public class FlinkDeploymentController
         boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
         if (success) {
             try {
-                success = reconcileFlinkDeployment(flinkApp, effectiveConfig);
+                success = reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
             } catch (Exception e) {
                 throw new RuntimeException(
                         "Error while reconciling deployment change for "
@@ -109,10 +116,11 @@ public class FlinkDeploymentController
     }
 
     private boolean reconcileFlinkDeployment(
-            FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
+            throws Exception {
         return flinkApp.getSpec().getJob() == null
-                ? sessionReconciler.reconcile(flinkApp, effectiveConfig)
-                : jobReconciler.reconcile(flinkApp, effectiveConfig);
+                ? sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig)
+                : jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
     }
 
     @Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 61468e4..564efc9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -26,7 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -51,7 +51,8 @@ public class JobReconciler {
         this.flinkService = flinkService;
     }
 
-    public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+    public boolean reconcile(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
 
         JobSpec jobSpec = flinkApp.getSpec().getJob();
@@ -65,7 +66,8 @@ public class JobReconciler {
                         flinkApp,
                         effectiveConfig,
                         Optional.ofNullable(jobSpec.getInitialSavepointPath()));
-                KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient);
+                IngressUtils.updateIngressRules(
+                        flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
                 return true;
             } catch (Exception e) {
                 LOG.error("Error while deploying " + flinkApp.getMetadata().getName(), e);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 374ee24..0c7bae5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -21,7 +21,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import org.slf4j.Logger;
@@ -43,13 +43,15 @@ public class SessionReconciler {
         this.flinkService = flinkService;
     }
 
-    public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+    public boolean reconcile(
+            String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
             throws Exception {
         if (flinkApp.getStatus() == null) {
             flinkApp.setStatus(new FlinkDeploymentStatus());
             try {
                 flinkService.submitSessionCluster(flinkApp, effectiveConfig);
-                KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient);
+                IngressUtils.updateIngressRules(
+                        flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
                 return true;
             } catch (Exception e) {
                 LOG.error("Error while deploying " + flinkApp.getMetadata().getName(), e);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
new file mode 100644
index 0000000..a412592
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Ingress utilities. */
+public class IngressUtils {
+
+    private static final String INGRESS_NAME = "flink-operator";
+    private static final String REST_SVC_NAME_SUFFIX = "-rest";
+
+    private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class);
+
+    public static void updateIngressRules(
+            FlinkDeployment flinkDeployment,
+            Configuration effectiveConfig,
+            String operatorNamespace,
+            KubernetesClient client,
+            boolean remove) {
+        if (flinkDeployment.getSpec().getIngressDomain() != null) {
+            final IngressRule ingressRule = fromDeployment(flinkDeployment, effectiveConfig);
+            getIngress(operatorNamespace, client)
+                    .ifPresent(
+                            ingress -> {
+                                Ingress updated;
+                                if (remove) {
+                                    updated =
+                                            new IngressBuilder(ingress)
+                                                    .editSpec()
+                                                    .removeFromRules(ingressRule)
+                                                    .endSpec()
+                                                    .build();
+                                } else {
+                                    updated =
+                                            new IngressBuilder(ingress)
+                                                    .editSpec()
+                                                    .addToRules(ingressRule)
+                                                    .endSpec()
+                                                    .build();
+                                }
+                                LOG.info("Updating ingress rules {}", ingress);
+                                client.resourceList(updated)
+                                        .inNamespace(operatorNamespace)
+                                        .createOrReplace();
+                            });
+        }
+    }
+
+    private static Optional<Ingress> getIngress(String operatorNamespace, KubernetesClient client) {
+        return Optional.ofNullable(
+                client.network()
+                        .v1()
+                        .ingresses()
+                        .inNamespace(operatorNamespace)
+                        .withName(INGRESS_NAME)
+                        .get());
+    }
+
+    private static IngressRule fromDeployment(
+            FlinkDeployment flinkDeployment, Configuration effectiveConfig) {
+        final String clusterId = flinkDeployment.getMetadata().getName();
+        final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
+        final String ingressHost = getIngressHost(flinkDeployment, clusterId);
+        return new IngressRule(
+                ingressHost,
+                new HTTPIngressRuleValueBuilder()
+                        .addNewPath()
+                        .withPathType("ImplementationSpecific")
+                        .withNewBackend()
+                        .withNewService()
+                        .withName(clusterId + REST_SVC_NAME_SUFFIX)
+                        .withNewPort()
+                        .withNumber(restPort)
+                        .endPort()
+                        .endService()
+                        .endBackend()
+                        .endPath()
+                        .build());
+    }
+
+    private static String getIngressHost(FlinkDeployment flinkDeployment, String clusterId) {
+        return String.format("%s.%s", clusterId, flinkDeployment.getSpec().getIngressDomain());
+    }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java
deleted file mode 100644
index dd5bcba..0000000
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.utils;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.api.model.OwnerReference;
-import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/** Kubernetes related utilities. */
-public class KubernetesUtils {
-
-    public static final String REST_SVC_NAME_SUFFIX = "-rest";
-    public static final String INGRESS_API_VERSION = "networking.k8s.io/v1";
-
-    private static final Logger LOG = LoggerFactory.getLogger(KubernetesUtils.class);
-
-    public static void deployIngress(
-            FlinkDeployment flinkApp,
-            Configuration effectiveConfig,
-            KubernetesClient kubernetesClient) {
-        if (flinkApp.getSpec().getIngressDomain() != null) {
-            final List<IngressRule> ingressRules = new ArrayList<>();
-            final String clusterId = flinkApp.getMetadata().getName();
-            final String namespace = flinkApp.getMetadata().getNamespace();
-            final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
-            final String ingressHost =
-                    String.format("%s.%s", clusterId, flinkApp.getSpec().getIngressDomain());
-            ingressRules.add(
-                    new IngressRule(
-                            ingressHost,
-                            new HTTPIngressRuleValueBuilder()
-                                    .addNewPath()
-                                    .withPathType("ImplementationSpecific")
-                                    .withNewBackend()
-                                    .withNewService()
-                                    .withName(clusterId + REST_SVC_NAME_SUFFIX)
-                                    .withNewPort()
-                                    .withNumber(restPort)
-                                    .endPort()
-                                    .endService()
-                                    .endBackend()
-                                    .endPath()
-                                    .build()));
-            final Ingress ingress =
-                    new IngressBuilder()
-                            .withApiVersion(INGRESS_API_VERSION)
-                            .withNewMetadata()
-                            .withName(clusterId)
-                            .endMetadata()
-                            .withNewSpec()
-                            .withRules(ingressRules)
-                            .endSpec()
-                            .build();
-
-            Deployment deployment =
-                    kubernetesClient
-                            .apps()
-                            .deployments()
-                            .inNamespace(flinkApp.getMetadata().getNamespace())
-                            .withName(flinkApp.getMetadata().getName())
-                            .get();
-            KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress));
-            LOG.info(ingress.toString());
-            kubernetesClient.resourceList(ingress).inNamespace(namespace).createOrReplace();
-        }
-    }
-
-    private static void setOwnerReference(HasMetadata owner, List<HasMetadata> resources) {
-        final OwnerReference ownerReference =
-                new OwnerReferenceBuilder()
-                        .withName(owner.getMetadata().getName())
-                        .withApiVersion(owner.getApiVersion())
-                        .withUid(owner.getMetadata().getUid())
-                        .withKind(owner.getKind())
-                        .withController(true)
-                        .withBlockOwnerDeletion(true)
-                        .build();
-        resources.forEach(
-                resource ->
-                        resource.getMetadata()
-                                .setOwnerReferences(Collections.singletonList(ownerReference)));
-    }
-}
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/templates/ingress.yaml
similarity index 69%
copy from helm/flink-operator/values.yaml
copy to helm/flink-operator/templates/ingress.yaml
index dd083e7..ddd990f 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/templates/ingress.yaml
@@ -15,33 +15,19 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-
 ---
-
-operatorNamespace:
-  name: default
-
-image:
-  repository: flink-operator
-  pullPolicy: IfNotPresent
-  tag: latest
-
-rbac:
-  create: true
-
-serviceAccount:
-  create: true
-  annotations: {}
-  name: "flink-operator"
-
-webhook:
-  create: true
-  keystore:
-    useDefaultPassword: true
-  # passwordSecretRef:
-  #   name: jks-password-secret
-  #   key: password-key
-
-imagePullSecrets: []
-nameOverride: ""
-fullnameOverride: ""
+{{- if .Values.ingress.create }}
+apiVersion: networking.k8s.io/v1
+kind: Ingress
+metadata:
+  name: {{ include "flink-operator.name" . }}
+  namespace: {{ .Values.operatorNamespace.name }}
+  labels:
+    {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+  defaultBackend:
+    service:
+      name: dummy-http-backend
+      port:
+        number: 80
+{{- end }}
diff --git a/helm/flink-operator/templates/rbac.yaml b/helm/flink-operator/templates/rbac.yaml
index 95d23ef..36b92a1 100644
--- a/helm/flink-operator/templates/rbac.yaml
+++ b/helm/flink-operator/templates/rbac.yaml
@@ -43,6 +43,7 @@ rules:
       - events
       - configmaps
       - secrets
+      - nodes
     verbs:
       - "*"
   - apiGroups:
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/values.yaml
index dd083e7..f65fd18 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/values.yaml
@@ -29,6 +29,9 @@ image:
 rbac:
   create: true
 
+ingress:
+  create: false
+
 serviceAccount:
   create: true
   annotations: {}