You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by wa...@apache.org on 2022/02/17 06:26:21 UTC
[flink] branch release-1.14 updated: [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service
This is an automated email from the ASF dual-hosted git repository.
wangyang0918 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 68f0688 [FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service
68f0688 is described below
commit 68f0688e38a43e99417c26b630395e234f605aff
Author: Aitozi <yu...@alibaba-inc.com>
AuthorDate: Mon Jan 24 16:32:04 2022 +0800
[FLINK-20830][k8s] Add type of Headless_Cluster_IP for external rest service
This closes #18767.
---
.../generated/kubernetes_config_configuration.html | 2 +-
.../kubernetes/KubernetesClusterDescriptor.java | 7 +-
.../configuration/KubernetesConfigOptions.java | 27 +++++-
.../kubeclient/Fabric8FlinkKubeClient.java | 5 +-
.../decorators/ExternalServiceDecorator.java | 33 ++------
.../decorators/InternalServiceDecorator.java | 24 +-----
.../kubeclient/services/ClusterIPService.java | 41 ++++++++++
.../services/HeadlessClusterIPService.java | 68 ++++++++++++++++
.../kubeclient/services/LoadBalancerService.java | 41 ++++++++++
.../kubeclient/services/NodePortService.java | 43 ++++++++++
.../kubeclient/services/ServiceType.java | 95 ++++++++++++++++++++++
.../apache/flink/kubernetes/utils/Constants.java | 2 -
.../flink/kubernetes/KubernetesClientTestBase.java | 32 +++++++-
.../decorators/ExternalServiceDecoratorTest.java | 17 ++++
.../factory/KubernetesJobManagerFactoryTest.java | 3 +-
.../kubeclient/services/ServiceTypeTest.java | 47 +++++++++++
16 files changed, 425 insertions(+), 62 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index 78d51ae..774ec0d 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -174,7 +174,7 @@
<td><h5>kubernetes.rest-service.exposed.type</h5></td>
<td style="word-wrap: break-word;">LoadBalancer</td>
<td><p>Enum</p></td>
- <td>The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.<br /><br />Possible values:<ul><li>"ClusterIP"</li><li>"NodePort"</li><li>"LoadBalancer"</li></ul></td>
+ <td>The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.<br /><br />Possible values:<ul><li>"ClusterIP"</li><li>"NodePort"</li><li>"LoadBalancer"</li><li>"Headless_ClusterIP"</li></ul></td>
</tr>
<tr>
<td><h5>kubernetes.secrets</h5></td>
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
index e291242..02708e8 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java
@@ -125,15 +125,16 @@ public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
private String getWebMonitorAddress(Configuration configuration) throws Exception {
AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION;
- if (configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE)
- == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+ final KubernetesConfigOptions.ServiceExposedType serviceType =
+ configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE);
+ if (serviceType.isClusterIP()) {
resolution = AddressResolution.NO_ADDRESS_RESOLUTION;
LOG.warn(
"Please note that Flink client operations(e.g. cancel, list, stop,"
+ " savepoint, etc.) won't work from outside the Kubernetes cluster"
+ " since '{}' has been set to {}.",
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(),
- KubernetesConfigOptions.ServiceExposedType.ClusterIP);
+ serviceType);
}
return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution);
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index a5f61d5..6250435 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -24,6 +24,11 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ExternalResourceOptions;
import org.apache.flink.configuration.description.Description;
+import org.apache.flink.kubernetes.kubeclient.services.ClusterIPService;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
+import org.apache.flink.kubernetes.kubeclient.services.LoadBalancerService;
+import org.apache.flink.kubernetes.kubeclient.services.NodePortService;
+import org.apache.flink.kubernetes.kubeclient.services.ServiceType;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -471,9 +476,25 @@ public class KubernetesConfigOptions {
/** The flink rest service exposed type. */
public enum ServiceExposedType {
- ClusterIP,
- NodePort,
- LoadBalancer
+ ClusterIP(ClusterIPService.INSTANCE),
+ NodePort(NodePortService.INSTANCE),
+ LoadBalancer(LoadBalancerService.INSTANCE),
+ Headless_ClusterIP(HeadlessClusterIPService.INSTANCE);
+
+ private final ServiceType serviceType;
+
+ ServiceExposedType(ServiceType serviceType) {
+ this.serviceType = serviceType;
+ }
+
+ public ServiceType serviceType() {
+ return serviceType;
+ }
+
+ /** Check whether it is ClusterIP type. */
+ public boolean isClusterIP() {
+ return this == ClusterIP || this == Headless_ClusterIP;
+ }
}
/** The flink rest service exposed type. */
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
index 5e9e523..dad7601 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
@@ -30,6 +30,7 @@ import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.kubeclient.services.ServiceType;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
@@ -177,10 +178,10 @@ public class Fabric8FlinkKubeClient implements FlinkKubeClient {
final int restPort = getRestPortFromExternalService(service);
final KubernetesConfigOptions.ServiceExposedType serviceExposedType =
- KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType());
+ ServiceType.classify(service);
// Return the external service.namespace directly when using ClusterIP.
- if (serviceExposedType == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+ if (serviceExposedType.isClusterIP()) {
return Optional.of(
new Endpoint(
ExternalServiceDecorator.getNamespacedExternalServiceName(
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java
index e866029..bda6952 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecorator.java
@@ -23,7 +23,6 @@ import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Service;
-import io.fabric8.kubernetes.api.model.ServiceBuilder;
import java.io.IOException;
import java.util.Collections;
@@ -42,37 +41,21 @@ public class ExternalServiceDecorator extends AbstractKubernetesStepDecorator {
@Override
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
- final String serviceName =
- getExternalServiceName(kubernetesJobManagerParameters.getClusterId());
+ final Service service =
+ kubernetesJobManagerParameters
+ .getRestServiceExposedType()
+ .serviceType()
+ .buildUpExternalRestService(kubernetesJobManagerParameters);
- final Service externalService =
- new ServiceBuilder()
- .withApiVersion(Constants.API_VERSION)
- .withNewMetadata()
- .withName(serviceName)
- .withLabels(kubernetesJobManagerParameters.getCommonLabels())
- .withAnnotations(kubernetesJobManagerParameters.getRestServiceAnnotations())
- .endMetadata()
- .withNewSpec()
- .withType(kubernetesJobManagerParameters.getRestServiceExposedType().name())
- .withSelector(kubernetesJobManagerParameters.getSelectors())
- .addNewPort()
- .withName(Constants.REST_PORT_NAME)
- .withPort(kubernetesJobManagerParameters.getRestPort())
- .withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort())
- .endPort()
- .endSpec()
- .build();
-
- return Collections.singletonList(externalService);
+ return Collections.singletonList(service);
}
- /** Generate name of the external Service. */
+ /** Generate name of the external rest Service. */
public static String getExternalServiceName(String clusterId) {
return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX;
}
- /** Generate namespaced name of the external Service. */
+ /** Generate namespaced name of the external rest Service. */
public static String getNamespacedExternalServiceName(String clusterId, String namespace) {
return getExternalServiceName(clusterId) + "." + namespace;
}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java
index 3777f4c..5f07ec0 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java
@@ -20,11 +20,10 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
-import org.apache.flink.kubernetes.utils.Constants;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Service;
-import io.fabric8.kubernetes.api.model.ServiceBuilder;
import java.io.IOException;
import java.util.Collections;
@@ -55,25 +54,8 @@ public class InternalServiceDecorator extends AbstractKubernetesStepDecorator {
getInternalServiceName(kubernetesJobManagerParameters.getClusterId());
final Service headlessService =
- new ServiceBuilder()
- .withApiVersion(Constants.API_VERSION)
- .withNewMetadata()
- .withName(serviceName)
- .withLabels(kubernetesJobManagerParameters.getCommonLabels())
- .endMetadata()
- .withNewSpec()
- .withClusterIP(Constants.HEADLESS_SERVICE_CLUSTER_IP)
- .withSelector(kubernetesJobManagerParameters.getSelectors())
- .addNewPort()
- .withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
- .withPort(kubernetesJobManagerParameters.getRPCPort())
- .endPort()
- .addNewPort()
- .withName(Constants.BLOB_SERVER_PORT_NAME)
- .withPort(kubernetesJobManagerParameters.getBlobServerPort())
- .endPort()
- .endSpec()
- .build();
+ HeadlessClusterIPService.INSTANCE.buildUpInternalService(
+ kubernetesJobManagerParameters);
// Set job manager address to namespaced service name
final String namespace = kubernetesJobManagerParameters.getNamespace();
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ClusterIPService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ClusterIPService.java
new file mode 100644
index 0000000..a6a3d6b
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ClusterIPService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Service;
+
+/** The service type of ClusterIP. */
+public class ClusterIPService extends ServiceType {
+
+ public static final ClusterIPService INSTANCE = new ClusterIPService();
+
+ @Override
+ public Service buildUpInternalService(
+ KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getType() {
+ return KubernetesConfigOptions.ServiceExposedType.ClusterIP.name();
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
new file mode 100644
index 0000000..ef932ff
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/HeadlessClusterIPService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+
+/** The service type of Headless ClusterIP, which is an variant of ClusterIP. */
+public class HeadlessClusterIPService extends ClusterIPService {
+
+ public static final HeadlessClusterIPService INSTANCE = new HeadlessClusterIPService();
+ public static final String HEADLESS_CLUSTER_IP = "None";
+
+ @Override
+ public Service buildUpExternalRestService(
+ KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+ final Service service = super.buildUpExternalRestService(kubernetesJobManagerParameters);
+ service.getSpec().setClusterIP(HEADLESS_CLUSTER_IP);
+ return service;
+ }
+
+ @Override
+ public Service buildUpInternalService(
+ KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+ String serviceName =
+ InternalServiceDecorator.getInternalServiceName(
+ kubernetesJobManagerParameters.getClusterId());
+ return new ServiceBuilder()
+ .withApiVersion(Constants.API_VERSION)
+ .withNewMetadata()
+ .withName(serviceName)
+ .withLabels(kubernetesJobManagerParameters.getCommonLabels())
+ .endMetadata()
+ .withNewSpec()
+ .withClusterIP(HEADLESS_CLUSTER_IP)
+ .withSelector(kubernetesJobManagerParameters.getSelectors())
+ .addNewPort()
+ .withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
+ .withPort(kubernetesJobManagerParameters.getRPCPort())
+ .endPort()
+ .addNewPort()
+ .withName(Constants.BLOB_SERVER_PORT_NAME)
+ .withPort(kubernetesJobManagerParameters.getBlobServerPort())
+ .endPort()
+ .endSpec()
+ .build();
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
new file mode 100644
index 0000000..57b2a36
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/LoadBalancerService.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Service;
+
+/** The service type of LoadBalancer. */
+public class LoadBalancerService extends ServiceType {
+
+ public static final LoadBalancerService INSTANCE = new LoadBalancerService();
+
+ @Override
+ public Service buildUpInternalService(
+ KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getType() {
+ return KubernetesConfigOptions.ServiceExposedType.LoadBalancer.name();
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
new file mode 100644
index 0000000..9fcf1a7
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/NodePortService.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+
+import io.fabric8.kubernetes.api.model.Service;
+
+/** The service type of NodePort. */
+public class NodePortService extends ServiceType {
+
+ public static final NodePortService INSTANCE = new NodePortService();
+
+ private NodePortService() {}
+
+ @Override
+ public Service buildUpInternalService(
+ KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getType() {
+ return KubernetesConfigOptions.ServiceExposedType.NodePort.name();
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java
new file mode 100644
index 0000000..5bfe047
--- /dev/null
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/services/ServiceType.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
+import org.apache.flink.kubernetes.utils.Constants;
+
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+
+/** An abstract class represents the service type that flink supported. */
+public abstract class ServiceType {
+
+ /**
+ * Build up the external rest service template, according to the jobManager parameters.
+ *
+ * @param kubernetesJobManagerParameters the parameters of jobManager.
+ * @return the external rest service
+ */
+ public Service buildUpExternalRestService(
+ KubernetesJobManagerParameters kubernetesJobManagerParameters) {
+ final String serviceName =
+ ExternalServiceDecorator.getExternalServiceName(
+ kubernetesJobManagerParameters.getClusterId());
+
+ return new ServiceBuilder()
+ .withApiVersion(Constants.API_VERSION)
+ .withNewMetadata()
+ .withName(serviceName)
+ .withLabels(kubernetesJobManagerParameters.getCommonLabels())
+ .withAnnotations(kubernetesJobManagerParameters.getRestServiceAnnotations())
+ .endMetadata()
+ .withNewSpec()
+ .withType(
+ kubernetesJobManagerParameters
+ .getRestServiceExposedType()
+ .serviceType()
+ .getType())
+ .withSelector(kubernetesJobManagerParameters.getSelectors())
+ .addNewPort()
+ .withName(Constants.REST_PORT_NAME)
+ .withPort(kubernetesJobManagerParameters.getRestPort())
+ .withNewTargetPort(kubernetesJobManagerParameters.getRestBindPort())
+ .endPort()
+ .endSpec()
+ .build();
+ }
+
+ /**
+ * Build up the internal service template, according to the jobManager parameters.
+ *
+ * @param kubernetesJobManagerParameters the parameters of jobManager.
+ * @return the internal service
+ */
+ public abstract Service buildUpInternalService(
+ KubernetesJobManagerParameters kubernetesJobManagerParameters);
+
+ /**
+ * Gets the type of the target kubernetes service.
+ *
+ * @return the type of the target kubernetes service.
+ */
+ public abstract String getType();
+
+ // Helper method
+ public static KubernetesConfigOptions.ServiceExposedType classify(Service service) {
+ KubernetesConfigOptions.ServiceExposedType type =
+ KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType());
+ if (type == KubernetesConfigOptions.ServiceExposedType.ClusterIP) {
+ if (HeadlessClusterIPService.HEADLESS_CLUSTER_IP.equals(
+ service.getSpec().getClusterIP())) {
+ type = KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP;
+ }
+ }
+ return type;
+ }
+}
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
index aa3bfa5..0d2db5e 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
@@ -82,8 +82,6 @@ public class Constants {
public static final String POD_IP_FIELD_PATH = "status.podIP";
- public static final String HEADLESS_SERVICE_CLUSTER_IP = "None";
-
public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45;
public static final String RESTART_POLICY_OF_NEVER = "Never";
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
index 6f46468..2d192aa 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesClientTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.Preconditions;
@@ -144,7 +145,8 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
return buildExternalService(
KubernetesConfigOptions.ServiceExposedType.LoadBalancer,
servicePort,
- serviceStatus);
+ serviceStatus,
+ false);
}
protected Service buildExternalServiceWithNodePort() {
@@ -162,7 +164,10 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
.build();
return buildExternalService(
- KubernetesConfigOptions.ServiceExposedType.NodePort, servicePort, serviceStatus);
+ KubernetesConfigOptions.ServiceExposedType.NodePort,
+ servicePort,
+ serviceStatus,
+ false);
}
protected Service buildExternalServiceWithClusterIP() {
@@ -174,13 +179,26 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
.build();
return buildExternalService(
- KubernetesConfigOptions.ServiceExposedType.ClusterIP, servicePort, null);
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP, servicePort, null, false);
+ }
+
+ protected Service buildExternalServiceWithHeadlessClusterIP() {
+ final ServicePort servicePort =
+ new ServicePortBuilder()
+ .withName(Constants.REST_PORT_NAME)
+ .withPort(REST_PORT)
+ .withNewTargetPort(REST_PORT)
+ .build();
+
+ return buildExternalService(
+ KubernetesConfigOptions.ServiceExposedType.ClusterIP, servicePort, null, true);
}
private Service buildExternalService(
KubernetesConfigOptions.ServiceExposedType serviceExposedType,
ServicePort servicePort,
- @Nullable ServiceStatus serviceStatus) {
+ @Nullable ServiceStatus serviceStatus,
+ boolean isHeadlessSvc) {
final ServiceBuilder serviceBuilder =
new ServiceBuilder()
.editOrNewMetadata()
@@ -196,6 +214,12 @@ public class KubernetesClientTestBase extends KubernetesTestBase {
serviceBuilder.withStatus(serviceStatus);
}
+ if (isHeadlessSvc) {
+ serviceBuilder
+ .editOrNewSpec()
+ .withClusterIP(HeadlessClusterIPService.HEADLESS_CLUSTER_IP)
+ .endSpec();
+ }
return serviceBuilder.build();
}
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java
index 41af647..5de0b35 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/ExternalServiceDecoratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.kubeclient.decorators;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
import org.apache.flink.kubernetes.utils.Constants;
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -119,4 +120,20 @@ public class ExternalServiceDecoratorTest extends KubernetesJobManagerTestBase {
KubernetesConfigOptions.ServiceExposedType.ClusterIP.name(),
((Service) servicesWithClusterIP.get(0)).getSpec().getType());
}
+
+ @Test
+ public void testSetServiceExposedTypeWithHeadless() throws IOException {
+
+ this.flinkConfig.set(
+ KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
+ KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP);
+ final List<HasMetadata> servicesWithHeadlessClusterIP =
+ this.externalServiceDecorator.buildAccompanyingKubernetesResources();
+ assertThat(
+ ((Service) servicesWithHeadlessClusterIP.get(0)).getSpec().getType(),
+ is(KubernetesConfigOptions.ServiceExposedType.ClusterIP.name()));
+ assertThat(
+ ((Service) servicesWithHeadlessClusterIP.get(0)).getSpec().getClusterIP(),
+ is(HeadlessClusterIPService.HEADLESS_CLUSTER_IP));
+ }
}
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
index f135040..d72e51a 100644
--- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.kubernetes.kubeclient.decorators.FlinkConfMountDecorator
import org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.decorators.KerberosMountDecorator;
+import org.apache.flink.kubernetes.kubeclient.services.HeadlessClusterIPService;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -287,7 +288,7 @@ public class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBas
assertNull(resultInternalService.getSpec().getType());
assertEquals(
- Constants.HEADLESS_SERVICE_CLUSTER_IP,
+ HeadlessClusterIPService.HEADLESS_CLUSTER_IP,
resultInternalService.getSpec().getClusterIP());
assertEquals(2, resultInternalService.getSpec().getPorts().size());
assertEquals(3, resultInternalService.getSpec().getSelector().size());
diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java
new file mode 100644
index 0000000..3a11ad6
--- /dev/null
+++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/services/ServiceTypeTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.services;
+
+import org.apache.flink.kubernetes.KubernetesClientTestBase;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+/** Tests for {@link ServiceType}. */
+public class ServiceTypeTest extends KubernetesClientTestBase {
+
+ @Test
+ public void testServiceClassify() {
+ assertThat(
+ ServiceType.classify(buildExternalServiceWithClusterIP()),
+ is(KubernetesConfigOptions.ServiceExposedType.ClusterIP));
+ assertThat(
+ ServiceType.classify(buildExternalServiceWithHeadlessClusterIP()),
+ is(KubernetesConfigOptions.ServiceExposedType.Headless_ClusterIP));
+ assertThat(
+ ServiceType.classify(buildExternalServiceWithNodePort()),
+ is(KubernetesConfigOptions.ServiceExposedType.NodePort));
+ assertThat(
+ ServiceType.classify(buildExternalServiceWithLoadBalancer("", "")),
+ is(KubernetesConfigOptions.ServiceExposedType.LoadBalancer));
+ }
+}