You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/02 01:37:04 UTC

[flink] branch release-1.12 updated: [FLINK-20359][k8s] Added Owner Reference to Job Manager in native kubernetes

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new dc81227  [FLINK-20359][k8s] Added Owner Reference to Job Manager in native kubernetes
dc81227 is described below

commit dc812271102c08c05d966e4c43cb2c1b1c0a03f2
Author: blublinsky <bo...@lightbend.com>
AuthorDate: Wed Dec 30 17:59:58 2020 -0600

    [FLINK-20359][k8s] Added Owner Reference to Job Manager in native kubernetes
    
    This closes #14591
---
 .../generated/kubernetes_config_configuration.html |  6 ++
 .../configuration/KubernetesConfigOptions.java     | 20 ++++++
 .../factory/KubernetesJobManagerFactory.java       |  6 ++
 .../parameters/KubernetesJobManagerParameters.java |  6 ++
 .../resources/KubernetesOwnerReference.java        | 74 ++++++++++++++++++++++
 .../factory/KubernetesJobManagerFactoryTest.java   | 23 +++++++
 6 files changed, 135 insertions(+)

diff --git a/docs/_includes/generated/kubernetes_config_configuration.html b/docs/_includes/generated/kubernetes_config_configuration.html
index c007d18..d1f8c77 100644
--- a/docs/_includes/generated/kubernetes_config_configuration.html
+++ b/docs/_includes/generated/kubernetes_config_configuration.html
@@ -111,6 +111,12 @@
             <td>The node selector to be set for JobManager pod. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.</td>
         </tr>
         <tr>
+            <td><h5>kubernetes.jobmanager.owner.reference</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>List&lt;Map&gt;</td>
+            <td>The user-specified <a href="https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#manual-resource-cleanup">Owner References</a> to be set to the JobManager Deployment. When all the owner resources are deleted, the JobManager Deployment will be deleted automatically, which also deletes all the resources created by this Flink cluster. The value should be formatted as a semicolon-separated list of owner references, where  [...]
+        </tr>
+        <tr>
             <td><h5>kubernetes.jobmanager.service-account</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 50ab3fa..04b92c5 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -89,6 +89,26 @@ public class KubernetesConfigOptions {
                                     + TASK_MANAGER_SERVICE_ACCOUNT.key()
                                     + "' for jobmanager and taskmanager respectively.");
 
+    public static final ConfigOption<List<Map<String, String>>> JOB_MANAGER_OWNER_REFERENCE =
+            key("kubernetes.jobmanager.owner.reference")
+                    .mapType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The user-specified %s to be set to the JobManager Deployment. "
+                                                    + "When all the owner resources are deleted, the JobManager Deployment "
+                                                    + "will be deleted automatically, which also deletes all the resources "
+                                                    + "created by this Flink cluster. The value should be formatted as a "
+                                                    + "semicolon-separated list of owner references, where each owner "
+                                                    + "reference is a comma-separated list of `key:value` pairs. E.g., "
+                                                    + "apiVersion:v1,blockOwnerDeletion:true,controller:true,kind:FlinkApplication,name:flink-app-name,uid:flink-app-uid;"
+                                                    + "apiVersion:v1,kind:Deployment,name:deploy-name,uid:deploy-uid",
+                                            link(
+                                                    "https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/native_kubernetes.html#manual-resource-cleanup",
+                                                    "Owner References"))
+                                    .build());
     public static final ConfigOption<Double> JOB_MANAGER_CPU =
             key("kubernetes.jobmanager.cpu")
                     .doubleType()
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
index b90880e..33fbf84 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactory.java
@@ -31,6 +31,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
 import org.apache.flink.kubernetes.kubeclient.decorators.KubernetesStepDecorator;
 import org.apache.flink.kubernetes.kubeclient.decorators.MountSecretsDecorator;
 import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesOwnerReference;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
@@ -45,6 +46,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Utility class for constructing all the Kubernetes components on the client-side. This can include
@@ -101,6 +103,10 @@ public class KubernetesJobManagerFactory {
                         KubernetesUtils.getDeploymentName(
                                 kubernetesJobManagerParameters.getClusterId()))
                 .withLabels(kubernetesJobManagerParameters.getLabels())
+                .withOwnerReferences(
+                        kubernetesJobManagerParameters.getOwnerReference().stream()
+                                .map(e -> KubernetesOwnerReference.fromMap(e).getInternalResource())
+                                .collect(Collectors.toList()))
                 .endMetadata()
                 .editOrNewSpec()
                 .withReplicas(1)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index 7a9d959..83581e4 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -95,6 +95,12 @@ public class KubernetesJobManagerParameters extends AbstractKubernetesParameters
                 .orElse(Collections.emptyList());
     }
 
+    public List<Map<String, String>> getOwnerReference() {
+        return flinkConfig
+                .getOptional(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE)
+                .orElse(Collections.emptyList());
+    }
+
     public Map<String, String> getRestServiceAnnotations() {
         return flinkConfig
                 .getOptional(KubernetesConfigOptions.REST_SERVICE_ANNOTATIONS)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesOwnerReference.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesOwnerReference.java
new file mode 100644
index 0000000..3d32e84
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesOwnerReference.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import io.fabric8.kubernetes.api.model.OwnerReference;
+import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/** Represent Owner reference resource in kubernetes. */
+public class KubernetesOwnerReference extends KubernetesResource<OwnerReference> {
+
+    private static final String API_VERSION = "apiversion";
+    private static final String DELETION = "blockownerdeletion";
+    private static final String CONTROLLER = "controller";
+    private static final String KIND = "kind";
+    private static final String NAME = "name";
+    private static final String UUID = "uid";
+
+    private static final Logger LOG = LoggerFactory.getLogger(KubernetesOwnerReference.class);
+
+    private KubernetesOwnerReference(OwnerReference ownerReference) {
+        super(ownerReference);
+    }
+
+    public static KubernetesOwnerReference fromMap(Map<String, String> stringMap) {
+        final OwnerReferenceBuilder ownerReferenceBuilder = new OwnerReferenceBuilder();
+        stringMap.forEach(
+                (k, v) -> {
+                    switch (k.toLowerCase()) {
+                        case API_VERSION:
+                            ownerReferenceBuilder.withApiVersion(v);
+                            break;
+                        case DELETION:
+                            ownerReferenceBuilder.withBlockOwnerDeletion(Boolean.valueOf(v));
+                            break;
+                        case CONTROLLER:
+                            ownerReferenceBuilder.withController(Boolean.valueOf(v));
+                            break;
+                        case KIND:
+                            ownerReferenceBuilder.withKind(v);
+                            break;
+                        case NAME:
+                            ownerReferenceBuilder.withName(v);
+                            break;
+                        case UUID:
+                            ownerReferenceBuilder.withUid(v);
+                            break;
+                        default:
+                            LOG.warn("Unrecognized key({}) of toleration, will ignore.", k);
+                            break;
+                    }
+                });
+        return new KubernetesOwnerReference(ownerReferenceBuilder.build());
+    }
+}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
index 7fd157e..5cd12b0 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
@@ -37,16 +37,19 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import io.fabric8.kubernetes.api.model.ConfigMap;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.OwnerReference;
 import io.fabric8.kubernetes.api.model.PodSpec;
 import io.fabric8.kubernetes.api.model.Quantity;
 import io.fabric8.kubernetes.api.model.Secret;
 import io.fabric8.kubernetes.api.model.Service;
 import io.fabric8.kubernetes.api.model.apps.Deployment;
 import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +62,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /** General tests for the {@link KubernetesJobManagerFactory}. */
@@ -70,6 +74,19 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas
 
     private static final String EXISTING_HADOOP_CONF_CONFIG_MAP = "hadoop-conf";
 
+    private static final String OWNER_REFERENCE_STRING =
+            "apiVersion:cloudflow.io/v1beta1,blockOwnerDeletion:true,"
+                    + "controller:true,kind:FlinkApplication,name:testapp,uid:e3c9aa3f-cc42-4178-814a-64aa15c82373";
+    private static final List<OwnerReference> OWNER_REFERENCES =
+            Collections.singletonList(
+                    new OwnerReference(
+                            "cloudflow.io/v1beta1",
+                            true,
+                            true,
+                            "FlinkApplication",
+                            "testapp",
+                            "e3c9aa3f-cc42-4178-814a-64aa15c82373"));
+
     protected KubernetesJobManagerSpecification kubernetesJobManagerSpecification;
 
     @Override
@@ -83,6 +100,8 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas
         flinkConfig.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "test");
         flinkConfig.set(
                 SecurityOptions.KERBEROS_KRB5_PATH, kerberosDir.toString() + "/" + KRB5_CONF_FILE);
+        flinkConfig.setString(
+                KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE.key(), OWNER_REFERENCE_STRING);
     }
 
     @Override
@@ -109,6 +128,10 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas
         expectedLabels.put(Constants.LABEL_COMPONENT_KEY, Constants.LABEL_COMPONENT_JOB_MANAGER);
         expectedLabels.putAll(userLabels);
         assertEquals(expectedLabels, resultDeployment.getMetadata().getLabels());
+
+        assertThat(
+                resultDeployment.getMetadata().getOwnerReferences(),
+                Matchers.containsInAnyOrder(OWNER_REFERENCES.toArray()));
     }
 
     @Test