You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/02/16 10:34:12 UTC
[flink-kubernetes-operator] 19/23: moving to single ingress
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit b84af2592d7f9c61ccee33747d8550f1f118f493
Author: Matyas Orhidi <ma...@apple.com>
AuthorDate: Thu Feb 10 15:35:47 2022 +0100
moving to single ingress
---
examples/basic-ingress.yaml | 2 +-
.../controller/FlinkDeploymentController.java | 16 ++-
.../operator/reconciler/JobReconciler.java | 8 +-
.../operator/reconciler/SessionReconciler.java | 8 +-
.../kubernetes/operator/utils/IngressUtils.java | 112 ++++++++++++++++++++
.../kubernetes/operator/utils/KubernetesUtils.java | 114 ---------------------
.../{values.yaml => templates/ingress.yaml} | 44 +++-----
helm/flink-operator/templates/rbac.yaml | 1 +
helm/flink-operator/values.yaml | 3 +
9 files changed, 154 insertions(+), 154 deletions(-)
diff --git a/examples/basic-ingress.yaml b/examples/basic-ingress.yaml
index c0ad3a2..d846766 100644
--- a/examples/basic-ingress.yaml
+++ b/examples/basic-ingress.yaml
@@ -20,7 +20,7 @@ apiVersion: flink.apache.org/v1alpha1
kind: FlinkDeployment
metadata:
namespace: default
- name: basic-example
+ name: basic-ingress
spec:
image: flink:1.14.3
flinkVersion: 1.14.3
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 5ab08e5..09eb536 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.observer.JobStatusObserver;
import org.apache.flink.kubernetes.operator.reconciler.JobReconciler;
import org.apache.flink.kubernetes.operator.reconciler.SessionReconciler;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -77,6 +78,12 @@ public class FlinkDeploymentController
public DeleteControl cleanup(FlinkDeployment flinkApp, Context context) {
LOG.info("Cleaning up application cluster {}", flinkApp.getMetadata().getName());
FlinkUtils.deleteCluster(flinkApp, kubernetesClient);
+ IngressUtils.updateIngressRules(
+ flinkApp,
+ FlinkUtils.getEffectiveConfig(flinkApp),
+ operatorNamespace,
+ kubernetesClient,
+ true);
return DeleteControl.defaultDelete();
}
@@ -89,7 +96,7 @@ public class FlinkDeploymentController
boolean success = observer.observeFlinkJobStatus(flinkApp, effectiveConfig);
if (success) {
try {
- success = reconcileFlinkDeployment(flinkApp, effectiveConfig);
+ success = reconcileFlinkDeployment(operatorNamespace, flinkApp, effectiveConfig);
} catch (Exception e) {
throw new RuntimeException(
"Error while reconciling deployment change for "
@@ -109,10 +116,11 @@ public class FlinkDeploymentController
}
private boolean reconcileFlinkDeployment(
- FlinkDeployment flinkApp, Configuration effectiveConfig) throws Exception {
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
+ throws Exception {
return flinkApp.getSpec().getJob() == null
- ? sessionReconciler.reconcile(flinkApp, effectiveConfig)
- : jobReconciler.reconcile(flinkApp, effectiveConfig);
+ ? sessionReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig)
+ : jobReconciler.reconcile(operatorNamespace, flinkApp, effectiveConfig);
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 61468e4..564efc9 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -26,7 +26,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -51,7 +51,8 @@ public class JobReconciler {
this.flinkService = flinkService;
}
- public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ public boolean reconcile(
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
throws Exception {
JobSpec jobSpec = flinkApp.getSpec().getJob();
@@ -65,7 +66,8 @@ public class JobReconciler {
flinkApp,
effectiveConfig,
Optional.ofNullable(jobSpec.getInitialSavepointPath()));
- KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient);
+ IngressUtils.updateIngressRules(
+ flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
return true;
} catch (Exception e) {
LOG.error("Error while deploying " + flinkApp.getMetadata().getName(), e);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index 374ee24..0c7bae5 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -21,7 +21,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.FlinkService;
-import org.apache.flink.kubernetes.operator.utils.KubernetesUtils;
+import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
@@ -43,13 +43,15 @@ public class SessionReconciler {
this.flinkService = flinkService;
}
- public boolean reconcile(FlinkDeployment flinkApp, Configuration effectiveConfig)
+ public boolean reconcile(
+ String operatorNamespace, FlinkDeployment flinkApp, Configuration effectiveConfig)
throws Exception {
if (flinkApp.getStatus() == null) {
flinkApp.setStatus(new FlinkDeploymentStatus());
try {
flinkService.submitSessionCluster(flinkApp, effectiveConfig);
- KubernetesUtils.deployIngress(flinkApp, effectiveConfig, kubernetesClient);
+ IngressUtils.updateIngressRules(
+ flinkApp, effectiveConfig, operatorNamespace, kubernetesClient, false);
return true;
} catch (Exception e) {
LOG.error("Error while deploying " + flinkApp.getMetadata().getName(), e);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
new file mode 100644
index 0000000..a412592
--- /dev/null
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+
+import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
+import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/** Ingress utilities. */
+public class IngressUtils {
+
+ private static final String INGRESS_NAME = "flink-operator";
+ private static final String REST_SVC_NAME_SUFFIX = "-rest";
+
+ private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class);
+
+ public static void updateIngressRules(
+ FlinkDeployment flinkDeployment,
+ Configuration effectiveConfig,
+ String operatorNamespace,
+ KubernetesClient client,
+ boolean remove) {
+ if (flinkDeployment.getSpec().getIngressDomain() != null) {
+ final IngressRule ingressRule = fromDeployment(flinkDeployment, effectiveConfig);
+ getIngress(operatorNamespace, client)
+ .ifPresent(
+ ingress -> {
+ Ingress updated;
+ if (remove) {
+ updated =
+ new IngressBuilder(ingress)
+ .editSpec()
+ .removeFromRules(ingressRule)
+ .endSpec()
+ .build();
+ } else {
+ updated =
+ new IngressBuilder(ingress)
+ .editSpec()
+ .addToRules(ingressRule)
+ .endSpec()
+ .build();
+ }
+ LOG.info("Updating ingress rules {}", ingress);
+ client.resourceList(updated)
+ .inNamespace(operatorNamespace)
+ .createOrReplace();
+ });
+ }
+ }
+
+ private static Optional<Ingress> getIngress(String operatorNamespace, KubernetesClient client) {
+ return Optional.ofNullable(
+ client.network()
+ .v1()
+ .ingresses()
+ .inNamespace(operatorNamespace)
+ .withName(INGRESS_NAME)
+ .get());
+ }
+
+ private static IngressRule fromDeployment(
+ FlinkDeployment flinkDeployment, Configuration effectiveConfig) {
+ final String clusterId = flinkDeployment.getMetadata().getName();
+ final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
+ final String ingressHost = getIngressHost(flinkDeployment, clusterId);
+ return new IngressRule(
+ ingressHost,
+ new HTTPIngressRuleValueBuilder()
+ .addNewPath()
+ .withPathType("ImplementationSpecific")
+ .withNewBackend()
+ .withNewService()
+ .withName(clusterId + REST_SVC_NAME_SUFFIX)
+ .withNewPort()
+ .withNumber(restPort)
+ .endPort()
+ .endService()
+ .endBackend()
+ .endPath()
+ .build());
+ }
+
+ private static String getIngressHost(FlinkDeployment flinkDeployment, String clusterId) {
+ return String.format("%s.%s", clusterId, flinkDeployment.getSpec().getIngressDomain());
+ }
+}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java
deleted file mode 100644
index dd5bcba..0000000
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/KubernetesUtils.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kubernetes.operator.utils;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.RestOptions;
-import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
-
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.api.model.OwnerReference;
-import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
-import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
-import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
-import io.fabric8.kubernetes.client.KubernetesClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/** Kubernetes related utilities. */
-public class KubernetesUtils {
-
- public static final String REST_SVC_NAME_SUFFIX = "-rest";
- public static final String INGRESS_API_VERSION = "networking.k8s.io/v1";
-
- private static final Logger LOG = LoggerFactory.getLogger(KubernetesUtils.class);
-
- public static void deployIngress(
- FlinkDeployment flinkApp,
- Configuration effectiveConfig,
- KubernetesClient kubernetesClient) {
- if (flinkApp.getSpec().getIngressDomain() != null) {
- final List<IngressRule> ingressRules = new ArrayList<>();
- final String clusterId = flinkApp.getMetadata().getName();
- final String namespace = flinkApp.getMetadata().getNamespace();
- final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
- final String ingressHost =
- String.format("%s.%s", clusterId, flinkApp.getSpec().getIngressDomain());
- ingressRules.add(
- new IngressRule(
- ingressHost,
- new HTTPIngressRuleValueBuilder()
- .addNewPath()
- .withPathType("ImplementationSpecific")
- .withNewBackend()
- .withNewService()
- .withName(clusterId + REST_SVC_NAME_SUFFIX)
- .withNewPort()
- .withNumber(restPort)
- .endPort()
- .endService()
- .endBackend()
- .endPath()
- .build()));
- final Ingress ingress =
- new IngressBuilder()
- .withApiVersion(INGRESS_API_VERSION)
- .withNewMetadata()
- .withName(clusterId)
- .endMetadata()
- .withNewSpec()
- .withRules(ingressRules)
- .endSpec()
- .build();
-
- Deployment deployment =
- kubernetesClient
- .apps()
- .deployments()
- .inNamespace(flinkApp.getMetadata().getNamespace())
- .withName(flinkApp.getMetadata().getName())
- .get();
- KubernetesUtils.setOwnerReference(deployment, Collections.singletonList(ingress));
- LOG.info(ingress.toString());
- kubernetesClient.resourceList(ingress).inNamespace(namespace).createOrReplace();
- }
- }
-
- private static void setOwnerReference(HasMetadata owner, List<HasMetadata> resources) {
- final OwnerReference ownerReference =
- new OwnerReferenceBuilder()
- .withName(owner.getMetadata().getName())
- .withApiVersion(owner.getApiVersion())
- .withUid(owner.getMetadata().getUid())
- .withKind(owner.getKind())
- .withController(true)
- .withBlockOwnerDeletion(true)
- .build();
- resources.forEach(
- resource ->
- resource.getMetadata()
- .setOwnerReferences(Collections.singletonList(ownerReference)));
- }
-}
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/templates/ingress.yaml
similarity index 69%
copy from helm/flink-operator/values.yaml
copy to helm/flink-operator/templates/ingress.yaml
index dd083e7..ddd990f 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/templates/ingress.yaml
@@ -15,33 +15,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-
---
-
-operatorNamespace:
- name: default
-
-image:
- repository: flink-operator
- pullPolicy: IfNotPresent
- tag: latest
-
-rbac:
- create: true
-
-serviceAccount:
- create: true
- annotations: {}
- name: "flink-operator"
-
-webhook:
- create: true
- keystore:
- useDefaultPassword: true
- # passwordSecretRef:
- # name: jks-password-secret
- # key: password-key
-
-imagePullSecrets: []
-nameOverride: ""
-fullnameOverride: ""
+{{- if .Values.ingress.create }}
+apiVersion: networking.k8s.io/v1
+kind: Ingress
+metadata:
+ name: {{ include "flink-operator.name" . }}
+ namespace: {{ .Values.operatorNamespace.name }}
+ labels:
+ {{- include "flink-operator.labels" . | nindent 4 }}
+spec:
+ defaultBackend:
+ service:
+ name: dummy-http-backend
+ port:
+ number: 80
+{{- end }}
diff --git a/helm/flink-operator/templates/rbac.yaml b/helm/flink-operator/templates/rbac.yaml
index 95d23ef..36b92a1 100644
--- a/helm/flink-operator/templates/rbac.yaml
+++ b/helm/flink-operator/templates/rbac.yaml
@@ -43,6 +43,7 @@ rules:
- events
- configmaps
- secrets
+ - nodes
verbs:
- "*"
- apiGroups:
diff --git a/helm/flink-operator/values.yaml b/helm/flink-operator/values.yaml
index dd083e7..f65fd18 100644
--- a/helm/flink-operator/values.yaml
+++ b/helm/flink-operator/values.yaml
@@ -29,6 +29,9 @@ image:
rbac:
create: true
+ingress:
+ create: false
+
serviceAccount:
create: true
annotations: {}