You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/14 12:37:48 UTC
[flink-kubernetes-operator] branch main updated: [FLINK-27613] Add label for the session job to help list the session cluster
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new bf60428 [FLINK-27613] Add label for the session job to help list the session cluster
bf60428 is described below
commit bf60428099e8f059c8115550f32a4033efd9d196
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Tue Jun 14 20:37:43 2022 +0800
[FLINK-27613] Add label for the session job to help list the session cluster
---
docs/content/docs/operations/helm.md | 3 +-
.../kubernetes/operator/crd/CrdConstants.java | 2 +
.../operator/admission/AdmissionHandler.java | 24 +++++-
.../operator/admission/FlinkOperatorWebhook.java | 4 +-
.../admission/mutator/DefaultRequestMutator.java | 94 ++++++++++++++++++++++
.../operator/admission/mutator/FlinkMutator.java | 71 ++++++++++++++++
.../operator/admission/AdmissionHandlerTest.java | 56 ++++++++++++-
.../templates/webhook.yaml | 40 ++++++++-
helm/flink-kubernetes-operator/values.yaml | 2 +
9 files changed, 286 insertions(+), 10 deletions(-)
diff --git a/docs/content/docs/operations/helm.md b/docs/content/docs/operations/helm.md
index df2fcd9..249522a 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -77,7 +77,8 @@ The configurable parameters of the Helm chart and which default values as detail
| podSecurityContext | Defines privilege and access control settings for a pod or container for pod security context. | runAsUser: 9999<br/>runAsGroup: 9999 |
| operatorSecurityContext | Defines privilege and access control settings for a pod or container for operator security context. | |
| webhookSecurityContext | Defines privilege and access control settings for a pod or container for webhook security context. | |
-| webhook.create | Whether to enable webhook to create for flink-kubernetes-operator. | true |
+| webhook.create | Whether to enable webhook validator to create for flink-kubernetes-operator. | true |
+| wenhook.mutator.create | Whether to enable webhook mutator to create for flink-kubernetes-operator. | True |
| webhook.keystore | The ConfigMap of webhook key store. | useDefaultPassword: true |
| defaultConfiguration.create | Whether to enable default configuration to create for flink-kubernetes-operator. | true |
| defaultConfiguration.append | Whether to append configuration files with configs. | true |
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java
index b699ad9..d807d33 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java
@@ -23,4 +23,6 @@ public class CrdConstants {
public static final String API_VERSION = "v1beta1";
public static final String KIND_SESSION_JOB = "FlinkSessionJob";
public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment";
+
+ public static final String LABEL_TARGET_SESSION = "target.session";
}
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
index b204699..2a877c9 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
@@ -17,6 +17,8 @@
package org.apache.flink.kubernetes.operator.admission;
+import org.apache.flink.kubernetes.operator.admission.mutator.DefaultRequestMutator;
+
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -40,6 +42,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
import io.javaoperatorsdk.admissioncontroller.AdmissionController;
+import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
import io.javaoperatorsdk.admissioncontroller.validation.Validator;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
@@ -62,11 +65,14 @@ public class AdmissionHandler extends SimpleChannelInboundHandler<HttpRequest> {
private static final Logger LOG = LoggerFactory.getLogger(AdmissionHandler.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
protected static final String VALIDATE_REQUEST_PATH = "/validate";
+ protected static final String MUTATOR_REQUEST_PATH = "/mutate";
private final AdmissionController<HasMetadata> validatingController;
+ private final AdmissionController<HasMetadata> mutatorController;
- public AdmissionHandler(Validator<HasMetadata> validator) {
+ public AdmissionHandler(Validator<HasMetadata> validator, Mutator<HasMetadata> mutator) {
this.validatingController = new AdmissionController<>(validator);
+ this.mutatorController = new AdmissionController<>(new DefaultRequestMutator(mutator));
}
@Override
@@ -85,11 +91,23 @@ public class AdmissionHandler extends SimpleChannelInboundHandler<HttpRequest> {
LOG.error("Failed to validate", e);
sendError(ctx, ExceptionUtils.getStackTrace(e));
}
+ } else if (MUTATOR_REQUEST_PATH.equals(path)) {
+ final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
+ AdmissionReview review;
+ try {
+ InputStream in = new ByteBufInputStream(msgContent);
+ review = objectMapper.readValue(in, AdmissionReview.class);
+ AdmissionReview response = mutatorController.handle(review);
+ sendResponse(ctx, objectMapper.writeValueAsString(response));
+ } catch (Exception e) {
+ LOG.error("Failed to mutate", e);
+ sendError(ctx, ExceptionUtils.getStackTrace(e));
+ }
} else {
String error =
String.format(
- "Illegal path requested: %s. Only %s is accepted.",
- path, VALIDATE_REQUEST_PATH);
+ "Illegal path requested: %s. Only %s or %s is accepted.",
+ path, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH);
LOG.error(error);
sendError(ctx, error);
}
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 21a8a21..a6b9960 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.admission;
+import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -61,7 +62,8 @@ public class FlinkOperatorWebhook {
FlinkConfigManager configManager = new FlinkConfigManager();
Set<FlinkResourceValidator> validators = ValidatorUtils.discoverValidators(configManager);
AdmissionHandler endpoint =
- new AdmissionHandler(new FlinkValidator(validators, configManager));
+ new AdmissionHandler(
+ new FlinkValidator(validators, configManager), new FlinkMutator());
ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
new file mode 100644
index 0000000..65c3edc
--- /dev/null
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionResponse;
+import io.fabric8.zjsonpatch.JsonDiff;
+import io.javaoperatorsdk.admissioncontroller.AdmissionUtils;
+import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
+import io.javaoperatorsdk.admissioncontroller.Operation;
+import io.javaoperatorsdk.admissioncontroller.RequestHandler;
+import io.javaoperatorsdk.admissioncontroller.clone.Cloner;
+import io.javaoperatorsdk.admissioncontroller.clone.ObjectMapperCloner;
+import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * The default request mutator. It's copied from the {@link
+ * io.javaoperatorsdk.admissioncontroller.mutation.DefaultRequestMutator} with a modified path diff
+ * util to serialize out include non-null.
+ *
+ * @param <T>
+ */
+public class DefaultRequestMutator<T extends KubernetesResource> implements RequestHandler {
+ private static final ObjectMapper mapper =
+ new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ private final Mutator<T> mutator;
+ private final Cloner<T> cloner;
+
+ public DefaultRequestMutator(Mutator<T> mutator) {
+ this(mutator, new ObjectMapperCloner<>());
+ }
+
+ public DefaultRequestMutator(Mutator<T> mutator, Cloner<T> cloner) {
+ this.mutator = mutator;
+ this.cloner = cloner;
+ }
+
+ public AdmissionResponse handle(AdmissionRequest admissionRequest) {
+ Operation operation = Operation.valueOf(admissionRequest.getOperation());
+ KubernetesResource originalResource =
+ AdmissionUtils.getTargetResource(admissionRequest, operation);
+ KubernetesResource clonedResource =
+ (KubernetesResource) this.cloner.clone((T) originalResource);
+
+ AdmissionResponse admissionResponse;
+ try {
+ T mutatedResource = this.mutator.mutate((T) clonedResource, operation);
+ admissionResponse = admissionResponseFromMutation(originalResource, mutatedResource);
+ } catch (NotAllowedException e) {
+ admissionResponse = AdmissionUtils.notAllowedExceptionToAdmissionResponse(e);
+ }
+
+ return admissionResponse;
+ }
+
+ public static AdmissionResponse admissionResponseFromMutation(
+ KubernetesResource originalResource, KubernetesResource mutatedResource) {
+ AdmissionResponse admissionResponse = new AdmissionResponse();
+ admissionResponse.setAllowed(true);
+ // It only allowed JSONPatch now, So we should avoid serialize out null value
+ // https://github.com/kubernetes/kubernetes/blob/3f1a9f9f3eaeae3d387b9152ea9aebb52be72319/pkg/apis/admission/types.go#L134
+ admissionResponse.setPatchType("JSONPatch");
+ JsonNode originalResNode = mapper.valueToTree(originalResource);
+ JsonNode mutatedResNode = mapper.valueToTree(mutatedResource);
+ JsonNode diff = JsonDiff.asJson(originalResNode, mutatedResNode);
+ String base64Diff =
+ Base64.getEncoder()
+ .encodeToString(diff.toString().getBytes(StandardCharsets.UTF_8));
+ admissionResponse.setPatch(base64Diff);
+ return admissionResponse;
+ }
+}
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java
new file mode 100644
index 0000000..1a1a997
--- /dev/null
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import org.apache.flink.kubernetes.operator.crd.CrdConstants;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
+import io.javaoperatorsdk.admissioncontroller.Operation;
+import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+/** The default mutator. */
+public class FlinkMutator implements Mutator<HasMetadata> {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class);
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public HasMetadata mutate(HasMetadata resource, Operation operation)
+ throws NotAllowedException {
+ if (operation == Operation.CREATE) {
+ LOG.debug("Mutating resource {}", resource);
+
+ if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
+ try {
+ var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class);
+ setSessionTargetLabel(sessionJob);
+ return sessionJob;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return resource;
+ }
+
+ private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) {
+ var labels = flinkSessionJob.getMetadata().getLabels();
+ if (labels == null) {
+ labels = new HashMap<>();
+ }
+ var deploymentName = flinkSessionJob.getSpec().getDeploymentName();
+ if (deploymentName != null
+ && !deploymentName.equals(labels.get(CrdConstants.LABEL_TARGET_SESSION))) {
+ labels.put(
+ CrdConstants.LABEL_TARGET_SESSION,
+ flinkSessionJob.getSpec().getDeploymentName());
+ flinkSessionJob.getMetadata().setLabels(labels);
+ }
+ }
+}
diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index 71adc90..9476bd0 100644
--- a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -17,12 +17,18 @@
package org.apache.flink.kubernetes.operator.admission;
+import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.CrdConstants;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledHeapByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -33,11 +39,14 @@ import com.fasterxml.jackson.databind.exc.MismatchedInputException;
import io.fabric8.kubernetes.api.model.GroupVersionKind;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Base64;
import static io.javaoperatorsdk.admissioncontroller.Operation.CREATE;
+import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.MUTATOR_REQUEST_PATH;
import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.VALIDATE_REQUEST_PATH;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod.GET;
import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
@@ -53,7 +62,8 @@ public class AdmissionHandlerTest {
new AdmissionHandler(
new FlinkValidator(
ValidatorUtils.discoverValidators(new FlinkConfigManager()),
- new FlinkConfigManager()));
+ new FlinkConfigManager()),
+ new FlinkMutator());
@Test
public void testHandleIllegalRequest() {
@@ -65,8 +75,8 @@ public class AdmissionHandlerTest {
assertEquals(INTERNAL_SERVER_ERROR, response.status());
assertEquals(
String.format(
- "Illegal path requested: %s. Only %s is accepted.",
- illegalRequest, VALIDATE_REQUEST_PATH),
+ "Illegal path requested: %s. Only %s or %s is accepted.",
+ illegalRequest, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH),
new String(response.content().array()));
assertTrue(embeddedChannel.finish());
}
@@ -114,4 +124,44 @@ public class AdmissionHandlerTest {
assertEquals(OK, response.status());
assertTrue(embeddedChannel.finish());
}
+
+ @Test
+ public void testMutateHandler() throws Exception {
+ final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler);
+ var sessionJob = new FlinkSessionJob();
+ sessionJob.setSpec(
+ FlinkSessionJobSpec.builder()
+ .job(JobSpec.builder().jarURI("http://myjob.jar").build())
+ .deploymentName("test-session")
+ .build());
+
+ final AdmissionRequest admissionRequest = new AdmissionRequest();
+ admissionRequest.setOperation(CREATE.name());
+ admissionRequest.setObject(sessionJob);
+ admissionRequest.setKind(
+ new GroupVersionKind(
+ sessionJob.getGroup(), sessionJob.getVersion(), sessionJob.getKind()));
+ final AdmissionReview admissionReview = new AdmissionReview();
+ admissionReview.setRequest(admissionRequest);
+ embeddedChannel.writeInbound(
+ new DefaultFullHttpRequest(
+ HTTP_1_1,
+ GET,
+ MUTATOR_REQUEST_PATH,
+ Unpooled.wrappedBuffer(
+ new ObjectMapper()
+ .writeValueAsString(admissionReview)
+ .getBytes())));
+ embeddedChannel.writeOutbound(new DefaultFullHttpResponse(HTTP_1_1, OK));
+ final DefaultHttpResponse response = embeddedChannel.readOutbound();
+ assertEquals(OK, response.status());
+ Assertions.assertFalse(embeddedChannel.outboundMessages().isEmpty());
+ var body = embeddedChannel.readOutbound();
+ Assertions.assertNotNull(body);
+ var str = new String(((UnpooledHeapByteBuf) body).array());
+ var review = new ObjectMapper().readValue(str, AdmissionReview.class);
+ var patch = new String(Base64.getDecoder().decode(review.getResponse().getPatch()));
+ Assertions.assertTrue(patch.contains(CrdConstants.LABEL_TARGET_SESSION));
+ assertTrue(embeddedChannel.finish());
+ }
}
diff --git a/helm/flink-kubernetes-operator/templates/webhook.yaml b/helm/flink-kubernetes-operator/templates/webhook.yaml
index c0ecf67..dbee9db 100644
--- a/helm/flink-kubernetes-operator/templates/webhook.yaml
+++ b/helm/flink-kubernetes-operator/templates/webhook.yaml
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
---
-{{- if .Values.webhook.create }}
+{{- if or .Values.webhook.create .Values.webhook.mutator.create }}
---
apiVersion: v1
kind: Service
@@ -75,6 +75,8 @@ metadata:
namespace: {{ .Release.Namespace }}
spec:
selfSigned: {}
+{{- end }}
+{{- if .Values.webhook.create }}
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
@@ -109,4 +111,38 @@ webhooks:
operator: In
values: [{{- range .Values.watchNamespaces }}{{ . | quote }},{{- end}}]
{{- end }}
- {{- end }}
+{{- end }}
+{{ if .Values.webhook.mutator.create }}
+---
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+ annotations:
+ cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/flink-operator-serving-cert
+ name: flink-operator-{{ .Release.Namespace }}-webhook-configuration
+webhooks:
+ - name: flinkoperator.flink.apache.org
+ admissionReviewVersions: ["v1"]
+ clientConfig:
+ service:
+ name: flink-operator-webhook-service
+ namespace: {{ .Release.Namespace }}
+ path: /mutate
+ failurePolicy: Fail
+ rules:
+ - apiGroups: ["*"]
+ apiVersions: ["*"]
+ scope: "Namespaced"
+ operations:
+ - CREATE
+ resources:
+ - flinksessionjobs
+ sideEffects: None
+ {{- if .Values.watchNamespaces }}
+ namespaceSelector:
+ matchExpressions:
+ - key: kubernetes.io/metadata.name
+ operator: In
+ values: [{{- range .Values.watchNamespaces }}{{ . | quote }},{{- end}}]
+ {{- end }}
+{{- end }}
diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml
index b42eac8..23b9da6 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -69,6 +69,8 @@ webhookSecurityContext: {}
webhook:
create: true
+ mutator:
+ create: true
keystore:
useDefaultPassword: true
# passwordSecretRef: