You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2022/06/14 12:37:48 UTC

[flink-kubernetes-operator] branch main updated: [FLINK-27613] Add label for the session job to help list the session cluster

This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new bf60428  [FLINK-27613] Add label for the session job to help list the session cluster
bf60428 is described below

commit bf60428099e8f059c8115550f32a4033efd9d196
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Tue Jun 14 20:37:43 2022 +0800

    [FLINK-27613] Add label for the session job to help list the session cluster
---
 docs/content/docs/operations/helm.md               |  3 +-
 .../kubernetes/operator/crd/CrdConstants.java      |  2 +
 .../operator/admission/AdmissionHandler.java       | 24 +++++-
 .../operator/admission/FlinkOperatorWebhook.java   |  4 +-
 .../admission/mutator/DefaultRequestMutator.java   | 94 ++++++++++++++++++++++
 .../operator/admission/mutator/FlinkMutator.java   | 71 ++++++++++++++++
 .../operator/admission/AdmissionHandlerTest.java   | 56 ++++++++++++-
 .../templates/webhook.yaml                         | 40 ++++++++-
 helm/flink-kubernetes-operator/values.yaml         |  2 +
 9 files changed, 286 insertions(+), 10 deletions(-)

diff --git a/docs/content/docs/operations/helm.md b/docs/content/docs/operations/helm.md
index df2fcd9..249522a 100644
--- a/docs/content/docs/operations/helm.md
+++ b/docs/content/docs/operations/helm.md
@@ -77,7 +77,8 @@ The configurable parameters of the Helm chart and which default values as detail
 | podSecurityContext | Defines privilege and access control settings for a pod or container for pod security context.  | runAsUser: 9999<br/>runAsGroup: 9999 |
 | operatorSecurityContext | Defines privilege and access control settings for a pod or container for operator security context.  | |
 | webhookSecurityContext | Defines privilege and access control settings for a pod or container for webhook security context. | |
-| webhook.create | Whether to enable webhook to create for flink-kubernetes-operator. | true |
+| webhook.create | Whether to enable webhook validator to create for flink-kubernetes-operator.                        | true |
+| wenhook.mutator.create | Whether to enable webhook mutator to create for flink-kubernetes-operator. | True |
 | webhook.keystore | The ConfigMap of webhook key store. | useDefaultPassword: true |
 | defaultConfiguration.create | Whether to enable default configuration to create for flink-kubernetes-operator. | true |
 | defaultConfiguration.append | Whether to append configuration files with configs.  | true |
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java
index b699ad9..d807d33 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/CrdConstants.java
@@ -23,4 +23,6 @@ public class CrdConstants {
     public static final String API_VERSION = "v1beta1";
     public static final String KIND_SESSION_JOB = "FlinkSessionJob";
     public static final String KIND_FLINK_DEPLOYMENT = "FlinkDeployment";
+
+    public static final String LABEL_TARGET_SESSION = "target.session";
 }
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
index b204699..2a877c9 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
+import org.apache.flink.kubernetes.operator.admission.mutator.DefaultRequestMutator;
+
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -40,6 +42,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
 import io.javaoperatorsdk.admissioncontroller.AdmissionController;
+import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
 import io.javaoperatorsdk.admissioncontroller.validation.Validator;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -62,11 +65,14 @@ public class AdmissionHandler extends SimpleChannelInboundHandler<HttpRequest> {
     private static final Logger LOG = LoggerFactory.getLogger(AdmissionHandler.class);
     private static final ObjectMapper objectMapper = new ObjectMapper();
     protected static final String VALIDATE_REQUEST_PATH = "/validate";
+    protected static final String MUTATOR_REQUEST_PATH = "/mutate";
 
     private final AdmissionController<HasMetadata> validatingController;
+    private final AdmissionController<HasMetadata> mutatorController;
 
-    public AdmissionHandler(Validator<HasMetadata> validator) {
+    public AdmissionHandler(Validator<HasMetadata> validator, Mutator<HasMetadata> mutator) {
         this.validatingController = new AdmissionController<>(validator);
+        this.mutatorController = new AdmissionController<>(new DefaultRequestMutator(mutator));
     }
 
     @Override
@@ -85,11 +91,23 @@ public class AdmissionHandler extends SimpleChannelInboundHandler<HttpRequest> {
                 LOG.error("Failed to validate", e);
                 sendError(ctx, ExceptionUtils.getStackTrace(e));
             }
+        } else if (MUTATOR_REQUEST_PATH.equals(path)) {
+            final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content();
+            AdmissionReview review;
+            try {
+                InputStream in = new ByteBufInputStream(msgContent);
+                review = objectMapper.readValue(in, AdmissionReview.class);
+                AdmissionReview response = mutatorController.handle(review);
+                sendResponse(ctx, objectMapper.writeValueAsString(response));
+            } catch (Exception e) {
+                LOG.error("Failed to mutate", e);
+                sendError(ctx, ExceptionUtils.getStackTrace(e));
+            }
         } else {
             String error =
                     String.format(
-                            "Illegal path requested: %s. Only %s is accepted.",
-                            path, VALIDATE_REQUEST_PATH);
+                            "Illegal path requested: %s. Only %s or %s is accepted.",
+                            path, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH);
             LOG.error(error);
             sendError(ctx, error);
         }
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index 21a8a21..a6b9960 100644
--- a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
+import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
@@ -61,7 +62,8 @@ public class FlinkOperatorWebhook {
         FlinkConfigManager configManager = new FlinkConfigManager();
         Set<FlinkResourceValidator> validators = ValidatorUtils.discoverValidators(configManager);
         AdmissionHandler endpoint =
-                new AdmissionHandler(new FlinkValidator(validators, configManager));
+                new AdmissionHandler(
+                        new FlinkValidator(validators, configManager), new FlinkMutator());
         ChannelInitializer<SocketChannel> initializer = createChannelInitializer(endpoint);
         NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
         NioEventLoopGroup workerGroup = new NioEventLoopGroup();
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
new file mode 100644
index 0000000..65c3edc
--- /dev/null
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/DefaultRequestMutator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.KubernetesResource;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
+import io.fabric8.kubernetes.api.model.admission.v1.AdmissionResponse;
+import io.fabric8.zjsonpatch.JsonDiff;
+import io.javaoperatorsdk.admissioncontroller.AdmissionUtils;
+import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
+import io.javaoperatorsdk.admissioncontroller.Operation;
+import io.javaoperatorsdk.admissioncontroller.RequestHandler;
+import io.javaoperatorsdk.admissioncontroller.clone.Cloner;
+import io.javaoperatorsdk.admissioncontroller.clone.ObjectMapperCloner;
+import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+/**
+ * The default request mutator. It's copied from the {@link
+ * io.javaoperatorsdk.admissioncontroller.mutation.DefaultRequestMutator} with a modified path diff
+ * util to serialize out include non-null.
+ *
+ * @param <T>
+ */
+public class DefaultRequestMutator<T extends KubernetesResource> implements RequestHandler {
+    private static final ObjectMapper mapper =
+            new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+    private final Mutator<T> mutator;
+    private final Cloner<T> cloner;
+
+    public DefaultRequestMutator(Mutator<T> mutator) {
+        this(mutator, new ObjectMapperCloner<>());
+    }
+
+    public DefaultRequestMutator(Mutator<T> mutator, Cloner<T> cloner) {
+        this.mutator = mutator;
+        this.cloner = cloner;
+    }
+
+    public AdmissionResponse handle(AdmissionRequest admissionRequest) {
+        Operation operation = Operation.valueOf(admissionRequest.getOperation());
+        KubernetesResource originalResource =
+                AdmissionUtils.getTargetResource(admissionRequest, operation);
+        KubernetesResource clonedResource =
+                (KubernetesResource) this.cloner.clone((T) originalResource);
+
+        AdmissionResponse admissionResponse;
+        try {
+            T mutatedResource = this.mutator.mutate((T) clonedResource, operation);
+            admissionResponse = admissionResponseFromMutation(originalResource, mutatedResource);
+        } catch (NotAllowedException e) {
+            admissionResponse = AdmissionUtils.notAllowedExceptionToAdmissionResponse(e);
+        }
+
+        return admissionResponse;
+    }
+
+    public static AdmissionResponse admissionResponseFromMutation(
+            KubernetesResource originalResource, KubernetesResource mutatedResource) {
+        AdmissionResponse admissionResponse = new AdmissionResponse();
+        admissionResponse.setAllowed(true);
+        // It only allowed JSONPatch now, So we should avoid serialize out null value
+        // https://github.com/kubernetes/kubernetes/blob/3f1a9f9f3eaeae3d387b9152ea9aebb52be72319/pkg/apis/admission/types.go#L134
+        admissionResponse.setPatchType("JSONPatch");
+        JsonNode originalResNode = mapper.valueToTree(originalResource);
+        JsonNode mutatedResNode = mapper.valueToTree(mutatedResource);
+        JsonNode diff = JsonDiff.asJson(originalResNode, mutatedResNode);
+        String base64Diff =
+                Base64.getEncoder()
+                        .encodeToString(diff.toString().getBytes(StandardCharsets.UTF_8));
+        admissionResponse.setPatch(base64Diff);
+        return admissionResponse;
+    }
+}
diff --git a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java
new file mode 100644
index 0000000..1a1a997
--- /dev/null
+++ b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.admission.mutator;
+
+import org.apache.flink.kubernetes.operator.crd.CrdConstants;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.admissioncontroller.NotAllowedException;
+import io.javaoperatorsdk.admissioncontroller.Operation;
+import io.javaoperatorsdk.admissioncontroller.mutation.Mutator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+/** The default mutator. */
+public class FlinkMutator implements Mutator<HasMetadata> {
+    private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public HasMetadata mutate(HasMetadata resource, Operation operation)
+            throws NotAllowedException {
+        if (operation == Operation.CREATE) {
+            LOG.debug("Mutating resource {}", resource);
+
+            if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
+                try {
+                    var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class);
+                    setSessionTargetLabel(sessionJob);
+                    return sessionJob;
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return resource;
+    }
+
+    private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) {
+        var labels = flinkSessionJob.getMetadata().getLabels();
+        if (labels == null) {
+            labels = new HashMap<>();
+        }
+        var deploymentName = flinkSessionJob.getSpec().getDeploymentName();
+        if (deploymentName != null
+                && !deploymentName.equals(labels.get(CrdConstants.LABEL_TARGET_SESSION))) {
+            labels.put(
+                    CrdConstants.LABEL_TARGET_SESSION,
+                    flinkSessionJob.getSpec().getDeploymentName());
+            flinkSessionJob.getMetadata().setLabels(labels);
+        }
+    }
+}
diff --git a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index 71adc90..9476bd0 100644
--- a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++ b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -17,12 +17,18 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
+import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.crd.CrdConstants;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledHeapByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -33,11 +39,14 @@ import com.fasterxml.jackson.databind.exc.MismatchedInputException;
 import io.fabric8.kubernetes.api.model.GroupVersionKind;
 import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
 import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.Base64;
 
 import static io.javaoperatorsdk.admissioncontroller.Operation.CREATE;
+import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.MUTATOR_REQUEST_PATH;
 import static org.apache.flink.kubernetes.operator.admission.AdmissionHandler.VALIDATE_REQUEST_PATH;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod.GET;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
@@ -53,7 +62,8 @@ public class AdmissionHandlerTest {
             new AdmissionHandler(
                     new FlinkValidator(
                             ValidatorUtils.discoverValidators(new FlinkConfigManager()),
-                            new FlinkConfigManager()));
+                            new FlinkConfigManager()),
+                    new FlinkMutator());
 
     @Test
     public void testHandleIllegalRequest() {
@@ -65,8 +75,8 @@ public class AdmissionHandlerTest {
         assertEquals(INTERNAL_SERVER_ERROR, response.status());
         assertEquals(
                 String.format(
-                        "Illegal path requested: %s. Only %s is accepted.",
-                        illegalRequest, VALIDATE_REQUEST_PATH),
+                        "Illegal path requested: %s. Only %s or %s is accepted.",
+                        illegalRequest, VALIDATE_REQUEST_PATH, MUTATOR_REQUEST_PATH),
                 new String(response.content().array()));
         assertTrue(embeddedChannel.finish());
     }
@@ -114,4 +124,44 @@ public class AdmissionHandlerTest {
         assertEquals(OK, response.status());
         assertTrue(embeddedChannel.finish());
     }
+
+    @Test
+    public void testMutateHandler() throws Exception {
+        final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler);
+        var sessionJob = new FlinkSessionJob();
+        sessionJob.setSpec(
+                FlinkSessionJobSpec.builder()
+                        .job(JobSpec.builder().jarURI("http://myjob.jar").build())
+                        .deploymentName("test-session")
+                        .build());
+
+        final AdmissionRequest admissionRequest = new AdmissionRequest();
+        admissionRequest.setOperation(CREATE.name());
+        admissionRequest.setObject(sessionJob);
+        admissionRequest.setKind(
+                new GroupVersionKind(
+                        sessionJob.getGroup(), sessionJob.getVersion(), sessionJob.getKind()));
+        final AdmissionReview admissionReview = new AdmissionReview();
+        admissionReview.setRequest(admissionRequest);
+        embeddedChannel.writeInbound(
+                new DefaultFullHttpRequest(
+                        HTTP_1_1,
+                        GET,
+                        MUTATOR_REQUEST_PATH,
+                        Unpooled.wrappedBuffer(
+                                new ObjectMapper()
+                                        .writeValueAsString(admissionReview)
+                                        .getBytes())));
+        embeddedChannel.writeOutbound(new DefaultFullHttpResponse(HTTP_1_1, OK));
+        final DefaultHttpResponse response = embeddedChannel.readOutbound();
+        assertEquals(OK, response.status());
+        Assertions.assertFalse(embeddedChannel.outboundMessages().isEmpty());
+        var body = embeddedChannel.readOutbound();
+        Assertions.assertNotNull(body);
+        var str = new String(((UnpooledHeapByteBuf) body).array());
+        var review = new ObjectMapper().readValue(str, AdmissionReview.class);
+        var patch = new String(Base64.getDecoder().decode(review.getResponse().getPatch()));
+        Assertions.assertTrue(patch.contains(CrdConstants.LABEL_TARGET_SESSION));
+        assertTrue(embeddedChannel.finish());
+    }
 }
diff --git a/helm/flink-kubernetes-operator/templates/webhook.yaml b/helm/flink-kubernetes-operator/templates/webhook.yaml
index c0ecf67..dbee9db 100644
--- a/helm/flink-kubernetes-operator/templates/webhook.yaml
+++ b/helm/flink-kubernetes-operator/templates/webhook.yaml
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 ---
-{{- if .Values.webhook.create }}
+{{- if or .Values.webhook.create .Values.webhook.mutator.create }}
 ---
 apiVersion: v1
 kind: Service
@@ -75,6 +75,8 @@ metadata:
   namespace: {{ .Release.Namespace }}
 spec:
   selfSigned: {}
+{{- end }}
+{{- if .Values.webhook.create }}
 ---
 apiVersion: admissionregistration.k8s.io/v1
 kind: ValidatingWebhookConfiguration
@@ -109,4 +111,38 @@ webhooks:
         operator: In
         values: [{{- range .Values.watchNamespaces }}{{ . | quote }},{{- end}}]
   {{- end }}
-  {{- end }}
+{{- end }}
+{{ if .Values.webhook.mutator.create }}
+---
+apiVersion: admissionregistration.k8s.io/v1
+kind: MutatingWebhookConfiguration
+metadata:
+  annotations:
+    cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/flink-operator-serving-cert
+  name: flink-operator-{{ .Release.Namespace }}-webhook-configuration
+webhooks:
+  - name: flinkoperator.flink.apache.org
+    admissionReviewVersions: ["v1"]
+    clientConfig:
+      service:
+        name: flink-operator-webhook-service
+        namespace: {{ .Release.Namespace }}
+        path: /mutate
+    failurePolicy: Fail
+    rules:
+      - apiGroups: ["*"]
+        apiVersions: ["*"]
+        scope: "Namespaced"
+        operations:
+          - CREATE
+        resources:
+          - flinksessionjobs
+    sideEffects: None
+    {{- if .Values.watchNamespaces }}
+    namespaceSelector:
+      matchExpressions:
+        - key: kubernetes.io/metadata.name
+          operator: In
+          values: [{{- range .Values.watchNamespaces }}{{ . | quote }},{{- end}}]
+    {{- end }}
+{{- end }}
diff --git a/helm/flink-kubernetes-operator/values.yaml b/helm/flink-kubernetes-operator/values.yaml
index b42eac8..23b9da6 100644
--- a/helm/flink-kubernetes-operator/values.yaml
+++ b/helm/flink-kubernetes-operator/values.yaml
@@ -69,6 +69,8 @@ webhookSecurityContext: {}
 
 webhook:
   create: true
+  mutator:
+    create: true
   keystore:
     useDefaultPassword: true
   # passwordSecretRef: