You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/02/17 06:26:21 UTC

[flink] branch release-1.14 updated: [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service

This is an automated email from the ASF dual-hosted git repository.

wangyang0918 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 68f0688  [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service
68f0688 is described below

commit 68f0688e38a43e99417c26b630395e234f605aff
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Mon Jan 24 16:32:04 2022 +0800

    [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service
    
    This closes #18767.
---
 .../generated/kubernetes_config_configuration.html |  2 +-
 .../kubernetes/KubernetesClusterDescriptor.java    |  7 +-
 .../configuration/KubernetesConfigOptions.java     | 27 +++++-
 .../kubeclient/Fabric8FlinkKubeClient.java         |  5 +-
 .../decorators/ExternalServiceDecorator.java       | 33 ++------
 .../decorators/InternalServiceDecorator.java       | 24 +-----
 .../kubeclient/services/ClusterIPService.java      | 41 ++++++++++
 .../services/HeadlessClusterIPService.java         | 68 ++++++++++++++++
 .../kubeclient/services/LoadBalancerService.java   | 41 ++++++++++
 .../kubeclient/services/NodePortService.java       | 43 ++++++++++
 .../kubeclient/services/ServiceType.java           | 95 ++++++++++++++++++++++
 .../apache/flink/kubernetes/utils/Constants.java   |  2 -
 .../flink/kubernetes/KubernetesClientTestBase.java | 32 +++++++-
 .../decorators/ExternalServiceDecoratorTest.java   | 17 ++++
 .../factory/KubernetesJobManagerFactoryTest.java   |  3 +-
 .../kubeclient/services/ServiceTypeTest.java       | 47 +++++++++++
 16 files changed, 425 insertions(+), 62 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index 78d51ae..774ec0d 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -174,7 +174,7 @@
             <td><h5>kubernetes.rest-service.exposed.type</h5></td>
             <td style="word-wrap: break-word;">LoadBalancer</td>
             <td><p>Enum</p></td>
-            <td>The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.<br /><br />Possible values:<ul><li>"ClusterIP"</li><li>"NodePort"</li><li>"LoadBalancer"</li></ul></td>
+            <td>The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.<br /><br />Possible values:<ul><li>"ClusterIP"</li><li>"NodePort"</li><li>"LoadBalancer"</li><li>"Headless_ClusterIP"</li></ul></td>
         </tr>
         <tr>
             <td><h5>kubernetes.secrets</h5></td>
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index e291242..02708e8 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -125,15 +125,16 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
 
     private String getWebMonitorAddress(Configuration configuration) throws Exception {
         AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION;
-        if (configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)
-                == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+        final KubernetesConfigOptions.ServiceExposedType serviceType =
+                configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
+        if (serviceType.isClusterIP()) {
             resolution = AddressResolution.NO_ADDRESS_RESOLUTION;
             LOG.warn(
                     "Please note that Flink client operations(e.g. cancel, list, stop,"
                             + " savepoint, etc.) won't work from outside the Kubernetes cluster"
                             + " since '{}' has been set to {}.",
                     KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
-                    KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+                    serviceType);
         }
         return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
     }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index a5f61d5..6250435 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -24,6 +24,11 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ExternalResourceOptions;
 import org.apache.flink.configuration.description.Description;
+import org.apache.flink.kubernetes.kubeclient.services.ClusterIPService;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
+import org.apache.flink.kubernetes.kubeclient.services.LoadBalancerService;
+import org.apache.flink.kubernetes.kubeclient.services.NodePortService;
+import org.apache.flink.kubernetes.kubeclient.services.ServiceType;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -471,9 +476,25 @@ public class KubernetesConfigOptions {
 
     /** The flink rest service exposed type. */
     public enum ServiceExposedType {
-        ClusterIP,
-        NodePort,
-        LoadBalancer
+        ClusterIP(ClusterIPService.INSTANCE),
+        NodePort(NodePortService.INSTANCE),
+        LoadBalancer(LoadBalancerService.INSTANCE),
+        Headless_ClusterIP(HeadlessClusterIPService.INSTANCE);
+
+        private final ServiceType serviceType;
+
+        ServiceExposedType(ServiceType serviceType) {
+            this.serviceType = serviceType;
+        }
+
+        public ServiceType serviceType() {
+            return serviceType;
+        }
+
+        /** Check whether it is ClusterIP type. */
+        public boolean isClusterIP() {
+            return this == ClusterIP || this == Headless_ClusterIP;
+        }
     }
 
     /** The flink rest service exposed type. */
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
index 5e9e523..dad7601 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
@@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
 import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.kubeclient.services.ServiceType;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
@@ -177,10 +178,10 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
         final int restPort = getRestPortFromExternalService(service);
 
         final KubernetesConfigOptions.ServiceExposedType serviceExposedType =
-                KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType());
+                ServiceType.classify(service);
 
         // Return the external service.namespace directly when using ClusterIP.
-        if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+        if (serviceExposedType.isClusterIP()) {
             return Optional.of(
                     new Endpoint(
                             ExternalServiceDecorator.getNamespacedExternalServiceName(
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java
index e866029..bda6952 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java
@@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Service;
-import io.fabric8.kubernetes.api.model.ServiceBuilder;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -42,37 +41,21 @@ public class ExternalServiceDecorator extends AbstractKubernetesStepDecorator {
 
     @Override
     public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
-        final String serviceName =
-                getExternalServiceName(kubernetesJobManagerParameters.getClusterId());
+        final Service service =
+                kubernetesJobManagerParameters
+                        .getRestServiceExposedType()
+                        .serviceType()
+                        .buildUpExternalRestService(kubernetesJobManagerParameters);
 
-        final Service externalService =
-                new ServiceBuilder()
-                        .withApiVersion(Constants.API_VERSION)
-                        .withNewMetadata()
-                        .withName(serviceName)
-                        .withLabels(kubernetesJobManagerParameters.getCommonLabels())
-                        .withAnnotations(kubernetesJobManagerParameters.getRestServiceAnnotations())
-                        .endMetadata()
-                        .withNewSpec()
-                        .withType(kubernetesJobManagerParameters.getRestServiceExposedType().name())
-                        .withSelector(kubernetesJobManagerParameters.getSelectors())
-                        .addNewPort()
-                        .withName(Constants.REST_PORT_NAME)
-                        .withPort(kubernetesJobManagerParameters.getRestPort())
-                        .withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort())
-                        .endPort()
-                        .endSpec()
-                        .build();
-
-        return Collections.singletonList(externalService);
+        return Collections.singletonList(service);
     }
 
-    /** Generate name of the external Service. */
+    /** Generate name of the external rest Service. */
     public static String getExternalServiceName(String clusterId) {
         return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX;
     }
 
-    /** Generate namespaced name of the external Service. */
+    /** Generate namespaced name of the external rest Service. */
     public static String getNamespacedExternalServiceName(String clusterId, String namespace) {
         return getExternalServiceName(clusterId) + "." + namespace;
     }
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java
index 3777f4c..5f07ec0 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java
@@ -20,11 +20,10 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
-import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.Service;
-import io.fabric8.kubernetes.api.model.ServiceBuilder;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -55,25 +54,8 @@ public class InternalServiceDecorator extends AbstractKubernetesStepDecorator {
                 getInternalServiceName(kubernetesJobManagerParameters.getClusterId());
 
         final Service headlessService =
-                new ServiceBuilder()
-                        .withApiVersion(Constants.API_VERSION)
-                        .withNewMetadata()
-                        .withName(serviceName)
-                        .withLabels(kubernetesJobManagerParameters.getCommonLabels())
-                        .endMetadata()
-                        .withNewSpec()
-                        .withClusterIP(Constants.HEADLESS_SERVICE_CLUSTER_IP)
-                        .withSelector(kubernetesJobManagerParameters.getSelectors())
-                        .addNewPort()
-                        .withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
-                        .withPort(kubernetesJobManagerParameters.getRPCPort())
-                        .endPort()
-                        .addNewPort()
-                        .withName(Constants.BLOB_SERVER_PORT_NAME)
-                        .withPort(kubernetesJobManagerParameters.getBlobServerPort())
-                        .endPort()
-                        .endSpec()
-                        .build();
+                HeadlessClusterIPService.INSTANCE.buildUpInternalService(
+                        kubernetesJobManagerParameters);
 
         // Set job manager address to namespaced service name
         final String namespace = kubernetesJobManagerParameters.getNamespace();
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ClusterIPService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ClusterIPService.java
new file mode 100644
index 0000000..a6a3d6b
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ClusterIPService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Service;
+
+/** The service type of ClusterIP. */
+public class ClusterIPService extends ServiceType {
+
+    public static final ClusterIPService INSTANCE = new ClusterIPService();
+
+    @Override
+    public Service buildUpInternalService(
+            KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getType() {
+        return KubernetesConfigOptions.ServiceExposedType.ClusterIP.name();
+    }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
new file mode 100644
index 0000000..ef932ff
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+
+/** The service type of Headless ClusterIP, which is an variant of ClusterIP. */
+public class HeadlessClusterIPService extends ClusterIPService {
+
+    public static final HeadlessClusterIPService INSTANCE = new HeadlessClusterIPService();
+    public static final String HEADLESS_CLUSTER_IP = "None";
+
+    @Override
+    public Service buildUpExternalRestService(
+            KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+        final Service service = super.buildUpExternalRestService(kubernetesJobManagerParameters);
+        service.getSpec().setClusterIP(HEADLESS_CLUSTER_IP);
+        return service;
+    }
+
+    @Override
+    public Service buildUpInternalService(
+            KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+        String serviceName =
+                InternalServiceDecorator.getInternalServiceName(
+                        kubernetesJobManagerParameters.getClusterId());
+        return new ServiceBuilder()
+                .withApiVersion(Constants.API_VERSION)
+                .withNewMetadata()
+                .withName(serviceName)
+                .withLabels(kubernetesJobManagerParameters.getCommonLabels())
+                .endMetadata()
+                .withNewSpec()
+                .withClusterIP(HEADLESS_CLUSTER_IP)
+                .withSelector(kubernetesJobManagerParameters.getSelectors())
+                .addNewPort()
+                .withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
+                .withPort(kubernetesJobManagerParameters.getRPCPort())
+                .endPort()
+                .addNewPort()
+                .withName(Constants.BLOB_SERVER_PORT_NAME)
+                .withPort(kubernetesJobManagerParameters.getBlobServerPort())
+                .endPort()
+                .endSpec()
+                .build();
+    }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
new file mode 100644
index 0000000..57b2a36
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Service;
+
+/** The service type of LoadBalancer. */
+public class LoadBalancerService extends ServiceType {
+
+    public static final LoadBalancerService INSTANCE = new LoadBalancerService();
+
+    @Override
+    public Service buildUpInternalService(
+            KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getType() {
+        return KubernetesConfigOptions.ServiceExposedType.LoadBalancer.name();
+    }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
new file mode 100644
index 0000000..9fcf1a7
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Service;
+
+/** The service type of NodePort. */
+public class NodePortService extends ServiceType {
+
+    public static final NodePortService INSTANCE = new NodePortService();
+
+    private NodePortService() {}
+
+    @Override
+    public Service buildUpInternalService(
+            KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getType() {
+        return KubernetesConfigOptions.ServiceExposedType.NodePort.name();
+    }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java
new file mode 100644
index 0000000..5bfe047
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+
+/** An abstract class represents the service type that flink supported. */
+public abstract class ServiceType {
+
+    /**
+     * Build up the external rest service template, according to the jobManager parameters.
+     *
+     * @param kubernetesJobManagerParameters the parameters of jobManager.
+     * @return the external rest service
+     */
+    public Service buildUpExternalRestService(
+            KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+        final String serviceName =
+                ExternalServiceDecorator.getExternalServiceName(
+                        kubernetesJobManagerParameters.getClusterId());
+
+        return new ServiceBuilder()
+                .withApiVersion(Constants.API_VERSION)
+                .withNewMetadata()
+                .withName(serviceName)
+                .withLabels(kubernetesJobManagerParameters.getCommonLabels())
+                .withAnnotations(kubernetesJobManagerParameters.getRestServiceAnnotations())
+                .endMetadata()
+                .withNewSpec()
+                .withType(
+                        kubernetesJobManagerParameters
+                                .getRestServiceExposedType()
+                                .serviceType()
+                                .getType())
+                .withSelector(kubernetesJobManagerParameters.getSelectors())
+                .addNewPort()
+                .withName(Constants.REST_PORT_NAME)
+                .withPort(kubernetesJobManagerParameters.getRestPort())
+                .withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort())
+                .endPort()
+                .endSpec()
+                .build();
+    }
+
+    /**
+     * Build up the internal service template, according to the jobManager parameters.
+     *
+     * @param kubernetesJobManagerParameters the parameters of jobManager.
+     * @return the internal service
+     */
+    public abstract Service buildUpInternalService(
+            KubernetesJobManagerParameters kubernetesJobManagerParameters);
+
+    /**
+     * Gets the type of the target kubernetes service.
+     *
+     * @return the type of the target kubernetes service.
+     */
+    public abstract String getType();
+
+    // Helper method
+    public static KubernetesConfigOptions.ServiceExposedType classify(Service service) {
+        KubernetesConfigOptions.ServiceExposedType type =
+                KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType());
+        if (type == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+            if (HeadlessClusterIPService.HEADLESS_CLUSTER_IP.equals(
+                    service.getSpec().getClusterIP())) {
+                type = KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP;
+            }
+        }
+        return type;
+    }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index aa3bfa5..0d2db5e 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -82,8 +82,6 @@ public class Constants {
 
     public static final String POD_IP_FIELD_PATH = "status.podIP";
 
-    public static final String HEADLESS_SERVICE_CLUSTER_IP = "None";
-
     public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45;
 
     public static final String RESTART_POLICY_OF_NEVER = "Never";
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
index 6f46468..2d192aa 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes;
 
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.util.Preconditions;
 
@@ -144,7 +145,8 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
         return buildExternalService(
                 KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
                 servicePort,
-                serviceStatus);
+                serviceStatus,
+                false);
     }
 
     protected Service buildExternalServiceWithNodePort() {
@@ -162,7 +164,10 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
                         .build();
 
         return buildExternalService(
-                KubernetesConfigOptions.ServiceExposedType.NodePort, servicePort, serviceStatus);
+                KubernetesConfigOptions.ServiceExposedType.NodePort,
+                servicePort,
+                serviceStatus,
+                false);
     }
 
     protected Service buildExternalServiceWithClusterIP() {
@@ -174,13 +179,26 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
                         .build();
 
         return buildExternalService(
-                KubernetesConfigOptions.ServiceExposedType.ClusterIP, servicePort, null);
+                KubernetesConfigOptions.ServiceExposedType.ClusterIP, servicePort, null, false);
+    }
+
+    protected Service buildExternalServiceWithHeadlessClusterIP() {
+        final ServicePort servicePort =
+                new ServicePortBuilder()
+                        .withName(Constants.REST_PORT_NAME)
+                        .withPort(REST_PORT)
+                        .withNewTargetPort(REST_PORT)
+                        .build();
+
+        return buildExternalService(
+                KubernetesConfigOptions.ServiceExposedType.ClusterIP, servicePort, null, true);
     }
 
     private Service buildExternalService(
             KubernetesConfigOptions.ServiceExposedType serviceExposedType,
             ServicePort servicePort,
-            @Nullable ServiceStatus serviceStatus) {
+            @Nullable ServiceStatus serviceStatus,
+            boolean isHeadlessSvc) {
         final ServiceBuilder serviceBuilder =
                 new ServiceBuilder()
                         .editOrNewMetadata()
@@ -196,6 +214,12 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
             serviceBuilder.withStatus(serviceStatus);
         }
 
+        if (isHeadlessSvc) {
+            serviceBuilder
+                    .editOrNewSpec()
+                    .withClusterIP(HeadlessClusterIPService.HEADLESS_CLUSTER_IP)
+                    .endSpec();
+        }
         return serviceBuilder.build();
     }
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java
index 41af647..5de0b35 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
 
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
 import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -119,4 +120,20 @@ public class ExternalServiceDecoratorTest extends KubernetesJobManagerTestBase {
                 KubernetesConfigOptions.ServiceExposedType.ClusterIP.name(),
                 ((Service) servicesWithClusterIP.get(0)).getSpec().getType());
     }
+
+    @Test
+    public void testSetServiceExposedTypeWithHeadless() throws IOException {
+
+        this.flinkConfig.set(
+                KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+                KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP);
+        final List<HasMetadata> servicesWithHeadlessClusterIP =
+                this.externalServiceDecorator.buildAccompanyingKubernetesResources();
+        assertThat(
+                ((Service) servicesWithHeadlessClusterIP.get(0)).getSpec().getType(),
+                is(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name()));
+        assertThat(
+                ((Service) servicesWithHeadlessClusterIP.get(0)).getSpec().getClusterIP(),
+                is(HeadlessClusterIPService.HEADLESS_CLUSTER_IP));
+    }
 }
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
index f135040..d72e51a 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator
 import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
 import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
 import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
@@ -287,7 +288,7 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas
 
         assertNull(resultInternalService.getSpec().getType());
         assertEquals(
-                Constants.HEADLESS_SERVICE_CLUSTER_IP,
+                HeadlessClusterIPService.HEADLESS_CLUSTER_IP,
                 resultInternalService.getSpec().getClusterIP());
         assertEquals(2, resultInternalService.getSpec().getPorts().size());
         assertEquals(3, resultInternalService.getSpec().getSelector().size());
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java
new file mode 100644
index 0000000..3a11ad6
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.KubernetesClientTestBase;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Tests for {@link ServiceType}. */
+public class ServiceTypeTest extends KubernetesClientTestBase {
+
+    @Test
+    public void testServiceClassify() {
+        assertThat(
+                ServiceType.classify(buildExternalServiceWithClusterIP()),
+                is(KubernetesConfigOptions.ServiceExposedType.ClusterIP));
+        assertThat(
+                ServiceType.classify(buildExternalServiceWithHeadlessClusterIP()),
+                is(KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP));
+        assertThat(
+                ServiceType.classify(buildExternalServiceWithNodePort()),
+                is(KubernetesConfigOptions.ServiceExposedType.NodePort));
+        assertThat(
+                ServiceType.classify(buildExternalServiceWithLoadBalancer("", "")),
+                is(KubernetesConfigOptions.ServiceExposedType.LoadBalancer));
+    }
+}