You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/10 02:57:58 UTC
[dubbo] branch 3.0-k8s updated: add Kubernetes registry support
(#6556)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0-k8s
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0-k8s by this push:
new 1583bc6 add Kubernetes registry support (#6556)
1583bc6 is described below
commit 1583bc63d02276b005bc2a11b72189540c3cf73b
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Mon Aug 10 10:57:38 2020 +0800
add Kubernetes registry support (#6556)
---
dubbo-all/pom.xml | 8 +
dubbo-dependencies-bom/pom.xml | 14 +
dubbo-registry/dubbo-registry-kubernetes/pom.xml | 73 ++++
.../kubernetes/KubernetesServiceDiscovery.java | 390 +++++++++++++++++++++
.../KubernetesServiceDiscoveryFactory.java | 28 ++
.../kubernetes/util/KubernetesClientConst.java | 76 ++++
.../kubernetes/util/KubernetesConfigUtils.java | 102 ++++++
...g.apache.dubbo.registry.client.ServiceDiscovery | 1 +
...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 +
.../kubernetes/KubernetesServiceDiscoveryTest.java | 191 ++++++++++
.../org.mockito.plugins.MockMaker | 1 +
dubbo-registry/pom.xml | 1 +
12 files changed, 886 insertions(+)
diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml
index 050ae24..2adfe45 100644
--- a/dubbo-all/pom.xml
+++ b/dubbo-all/pom.xml
@@ -316,6 +316,13 @@
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-kubernetes</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-monitor-api</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
@@ -654,6 +661,7 @@
<include>org.apache.dubbo:dubbo-registry-nacos</include>
<include>org.apache.dubbo:dubbo-registry-sofa</include>
<include>org.apache.dubbo:dubbo-registry-multiple</include>
+ <include>org.apache.dubbo:dubbo-registry-kubernetes</include>
<include>org.apache.dubbo:dubbo-monitor-api</include>
<include>org.apache.dubbo:dubbo-monitor-default</include>
<include>org.apache.dubbo:dubbo-container-api</include>
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 9c40c96..f256b12 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -145,6 +145,9 @@
<!-- Eureka -->
<eureka.version>1.9.12</eureka.version>
+ <!-- Fabric8 for Kubernetes -->
+ <fabric8_kubernetes_version>4.10.3</fabric8_kubernetes_version>
+
<!-- Alibaba -->
<alibaba_spring_context_support_version>1.0.8</alibaba_spring_context_support_version>
@@ -703,6 +706,17 @@
<artifactId>grpc-grpclb</artifactId>
<version>${grpc.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ <version>${fabric8_kubernetes_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-server-mock</artifactId>
+ <scope>test</scope>
+ <version>${fabric8_kubernetes_version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/dubbo-registry/dubbo-registry-kubernetes/pom.xml b/dubbo-registry/dubbo-registry-kubernetes/pom.xml
new file mode 100644
index 0000000..799e45e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry</artifactId>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>dubbo-registry-kubernetes</artifactId>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <description>The Kubernetes registry module of Dubbo project</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-registry-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-server-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>3.4.6</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>3.4.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
new file mode 100644
index 0000000..9b240f7
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesConfigUtils;
+
+import com.alibaba.fastjson.JSONObject;
+import io.fabric8.kubernetes.api.model.EndpointAddress;
+import io.fabric8.kubernetes.api.model.EndpointPort;
+import io.fabric8.kubernetes.api.model.EndpointSubset;
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class KubernetesServiceDiscovery implements ServiceDiscovery {
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private KubernetesClient kubernetesClient;
+
+ private String currentHostname;
+
+ private ServiceInstance localServiceInstance;
+
+ private URL registryURL;
+
+ private String namespace;
+
+ private boolean enableRegister;
+
+ public final static String KUBERNETES_PROPERTIES_KEY = "io.dubbo/metadata";
+
+ private final static ConcurrentHashMap<String, Watch> SERVICE_WATCHER = new ConcurrentHashMap<>(64);
+
+ private final static ConcurrentHashMap<String, Watch> PODS_WATCHER = new ConcurrentHashMap<>(64);
+
+ private final static ConcurrentHashMap<String, Watch> ENDPOINTS_WATCHER = new ConcurrentHashMap<>(64);
+
+ private final static ConcurrentHashMap<String, AtomicLong> SERVICE_UPDATE_TIME = new ConcurrentHashMap<>(64);
+
+ @Override
+ public void initialize(URL registryURL) throws Exception {
+ Config config = KubernetesConfigUtils.createKubernetesConfig(registryURL);
+ this.kubernetesClient = new DefaultKubernetesClient(config);
+ this.currentHostname = System.getenv("HOSTNAME");
+ this.registryURL = registryURL;
+ this.namespace = config.getNamespace();
+ this.enableRegister = registryURL.getParameter(KubernetesClientConst.ENABLE_REGISTER, true);
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ SERVICE_WATCHER.forEach((k, v) -> v.close());
+ SERVICE_WATCHER.clear();
+
+ PODS_WATCHER.forEach((k, v) -> v.close());
+ PODS_WATCHER.clear();
+
+ ENDPOINTS_WATCHER.forEach((k, v) -> v.close());
+ ENDPOINTS_WATCHER.clear();
+
+ kubernetesClient.close();
+ }
+
+ @Override
+ public void register(ServiceInstance serviceInstance) throws RuntimeException {
+ localServiceInstance = serviceInstance;
+
+ if (enableRegister) {
+ kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(currentHostname)
+ .edit()
+ .editOrNewMetadata()
+ .addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
+ .endMetadata()
+ .done();
+ if (logger.isInfoEnabled()) {
+ logger.info("Write Current Service Instance Metadata to Kubernetes pod. " +
+ "Current pod name: " + currentHostname);
+ }
+ }
+ }
+
+ @Override
+ public void update(ServiceInstance serviceInstance) throws RuntimeException {
+ register(serviceInstance);
+ }
+
+ @Override
+ public void unregister(ServiceInstance serviceInstance) throws RuntimeException {
+ localServiceInstance = null;
+
+ if (enableRegister) {
+ kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(currentHostname)
+ .edit()
+ .editOrNewMetadata()
+ .removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
+ .endMetadata()
+ .done();
+ if (logger.isInfoEnabled()) {
+ logger.info("Remove Current Service Instance from Kubernetes pod. Current pod name: " + currentHostname);
+ }
+ }
+ }
+
+ @Override
+ public Set<String> getServices() {
+ return kubernetesClient
+ .services()
+ .inNamespace(namespace)
+ .list()
+ .getItems()
+ .stream()
+ .map(service -> service.getMetadata().getName())
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public ServiceInstance getLocalInstance() {
+ return localServiceInstance;
+ }
+
+ @Override
+ public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+ Endpoints endpoints =
+ kubernetesClient
+ .endpoints()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .get();
+
+ return toServiceInstance(endpoints, serviceName);
+ }
+
+ @Override
+ public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+ listener.getServiceNames().forEach(serviceName -> {
+ SERVICE_UPDATE_TIME.put(serviceName, new AtomicLong(0L));
+
+ // Watch Service Endpoint Modification
+ watchEndpoints(listener, serviceName);
+
+ // Watch Pods Modification, happens when ServiceInstance updated
+ watchPods(listener, serviceName);
+
+ // Watch Service Modification, happens when Service Selector updated, used to update pods watcher
+ watchService(listener, serviceName);
+ });
+ }
+
+ private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
+ Watch watch = kubernetesClient
+ .endpoints()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .watch(new Watcher<Endpoints>() {
+ @Override
+ public void eventReceived(Action action, Endpoints resource) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Endpoint Event. Event type: " + action.name() +
+ ". Current pod name: " + currentHostname);
+ }
+
+ notifyServiceChanged(serviceName, listener);
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ // ignore
+ }
+ });
+
+ ENDPOINTS_WATCHER.put(serviceName, watch);
+ }
+
+ private void watchPods(ServiceInstancesChangedListener listener, String serviceName) {
+ Map<String, String> serviceSelector = getServiceSelector(serviceName);
+ if (serviceSelector == null) {
+ return;
+ }
+
+ Watch watch = kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(serviceSelector)
+ .watch(new Watcher<Pod>() {
+ @Override
+ public void eventReceived(Action action, Pod resource) {
+ if (Action.MODIFIED.equals(action)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Pods Update Event. Current pod name: " + currentHostname);
+ }
+
+ notifyServiceChanged(serviceName, listener);
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ // ignore
+ }
+ });
+
+ PODS_WATCHER.put(serviceName, watch);
+ }
+
+ private void watchService(ServiceInstancesChangedListener listener, String serviceName) {
+ Watch watch = kubernetesClient
+ .services()
+ .inNamespace(namespace)
+ .withName(serviceName)
+ .watch(new Watcher<Service>() {
+ @Override
+ public void eventReceived(Action action, Service resource) {
+ if (Action.MODIFIED.equals(action)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received Service Update Event. Update Pods Watcher. " +
+ "Current pod name: " + currentHostname);
+ }
+
+ if (PODS_WATCHER.containsKey(serviceName)) {
+ PODS_WATCHER.get(serviceName).close();
+ PODS_WATCHER.remove(serviceName);
+ }
+ watchPods(listener, serviceName);
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ // ignore
+ }
+ });
+
+ SERVICE_WATCHER.put(serviceName, watch);
+ }
+
+ private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
+ long receivedTime = System.nanoTime();
+
+ ServiceInstancesChangedEvent event;
+
+ event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
+
+ AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
+ long lastUpdateTime = updateTime.get();
+
+ if (lastUpdateTime <= receivedTime) {
+ if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
+ listener.onEvent(event);
+ return;
+ }
+ }
+
+ if (logger.isInfoEnabled()) {
+ logger.info("Discard Service Instance Data. " +
+ "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
+ "Current Data received time: " + receivedTime + ". " +
+ "Newer Data received time: " + lastUpdateTime + ".");
+ }
+ }
+
+ @Override
+ public URL getUrl() {
+ return registryURL;
+ }
+
+ private Map<String, String> getServiceSelector(String serviceName) {
+ Service service = kubernetesClient.services().inNamespace(namespace).withName(serviceName).get();
+ if (service == null) {
+ return null;
+ }
+ return service.getSpec().getSelector();
+ }
+
+ private List<ServiceInstance> toServiceInstance(Endpoints endpoints, String serviceName) {
+ Map<String, String> serviceSelector = getServiceSelector(serviceName);
+ if (serviceSelector == null) {
+ return new LinkedList<>();
+ }
+ Map<String, Pod> pods = kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabels(serviceSelector)
+ .list()
+ .getItems()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ pod -> pod.getMetadata().getName(),
+ pod -> pod));
+
+ List<ServiceInstance> instances = new LinkedList<>();
+ Set<Integer> instancePorts = new HashSet<>();
+
+ for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+ instancePorts.addAll(
+ endpointSubset.getPorts()
+ .stream().map(EndpointPort::getPort)
+ .collect(Collectors.toSet()));
+ }
+
+ for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+ for (EndpointAddress address : endpointSubset.getAddresses()) {
+ Pod pod = pods.get(address.getTargetRef().getName());
+ String ip = address.getIp();
+ if (pod == null) {
+ logger.warn("Unable to match Kubernetes Endpoint address with Pod. " +
+ "EndpointAddress Hostname: " + address.getTargetRef().getName());
+ continue;
+ }
+
+ instancePorts.forEach(port -> {
+ ServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, ip, port);
+
+ String properties = pod.getMetadata().getAnnotations().get(KUBERNETES_PROPERTIES_KEY);
+ if (StringUtils.isNotEmpty(properties)) {
+ serviceInstance.getMetadata().putAll(JSONObject.parseObject(properties, Map.class));
+ instances.add(serviceInstance);
+ } else {
+ logger.warn("Unable to find Service Instance metadata in Pod Annotations. " +
+ "Possibly cause: provider has not been initialized successfully. " +
+ "EndpointAddress Hostname: " + address.getTargetRef().getName());
+ }
+ });
+ }
+ }
+
+ return instances;
+ }
+
+ /**
+ * UT used only
+ */
+ @Deprecated
+ public void setCurrentHostname(String currentHostname) {
+ this.currentHostname = currentHostname;
+ }
+
+ /**
+ * UT used only
+ */
+ @Deprecated
+ public void setKubernetesClient(KubernetesClient kubernetesClient) {
+ this.kubernetesClient = kubernetesClient;
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryFactory.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryFactory.java
new file mode 100644
index 0000000..88d50e2
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class KubernetesServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new KubernetesServiceDiscovery();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesClientConst.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesClientConst.java
new file mode 100644
index 0000000..527d2fa
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesClientConst.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes.util;
+
+public class KubernetesClientConst {
+
+ public final static String ENABLE_REGISTER = "enableRegister";
+
+ public final static String TRUST_CERTS = "trustCerts";
+
+ public final static String USE_HTTPS = "useHttps";
+
+ public static final String HTTP2_DISABLE = "http2Disable";
+
+ public final static String NAMESPACE = "namespace";
+
+ public final static String API_VERSION = "apiVersion";
+
+ public final static String CA_CERT_FILE = "caCertFile";
+
+ public final static String CA_CERT_DATA = "caCertData";
+
+ public final static String CLIENT_CERT_FILE = "clientCertFile";
+
+ public final static String CLIENT_CERT_DATA = "clientCertData";
+
+ public final static String CLIENT_KEY_FILE = "clientKeyFile";
+
+ public final static String CLIENT_KEY_DATA = "clientKeyData";
+
+ public final static String CLIENT_KEY_ALGO = "clientKeyAlgo";
+
+ public final static String CLIENT_KEY_PASSPHRASE = "clientKeyPassphrase";
+
+ public final static String OAUTH_TOKEN = "oauthToken";
+
+ public final static String USERNAME = "username";
+
+ public final static String PASSWORD = "password";
+
+ public final static String WATCH_RECONNECT_INTERVAL = "watchReconnectInterval";
+
+ public final static String WATCH_RECONNECT_LIMIT = "watchReconnectLimit";
+
+ public final static String CONNECTION_TIMEOUT = "connectionTimeout";
+
+ public final static String REQUEST_TIMEOUT = "requestTimeout";
+
+ public final static String ROLLING_TIMEOUT = "rollingTimeout";
+
+ public final static String LOGGING_INTERVAL = "loggingInterval";
+
+ public final static String HTTP_PROXY = "httpProxy";
+
+ public final static String HTTPS_PROXY = "httpsProxy";
+
+ public final static String PROXY_USERNAME = "proxyUsername";
+
+ public final static String PROXY_PASSWORD = "proxyPassword";
+
+ public final static String NO_PROXY = "noProxy";
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesConfigUtils.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesConfigUtils.java
new file mode 100644
index 0000000..464e985
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesConfigUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes.util;
+
+import org.apache.dubbo.common.URL;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+
+public class KubernetesConfigUtils {
+
+ public static Config createKubernetesConfig(URL url) {
+ // Init default config
+ Config base = Config.autoConfigure(null);
+
+ // replace config with parameters if presents
+ return new ConfigBuilder(base)
+ .withMasterUrl(buildMasterUrl(url))
+ .withApiVersion(url.getParameter(KubernetesClientConst.API_VERSION,
+ base.getApiVersion()))
+ .withNamespace(url.getParameter(KubernetesClientConst.NAMESPACE,
+ base.getNamespace()))
+ .withUsername(url.getParameter(KubernetesClientConst.USERNAME,
+ base.getUsername()))
+ .withPassword(url.getParameter(KubernetesClientConst.PASSWORD,
+ base.getPassword()))
+
+ .withOauthToken(url.getParameter(KubernetesClientConst.OAUTH_TOKEN,
+ base.getOauthToken()))
+
+ .withCaCertFile(url.getParameter(KubernetesClientConst.CA_CERT_FILE,
+ base.getCaCertFile()))
+ .withCaCertData(url.getParameter(KubernetesClientConst.CA_CERT_DATA,
+ base.getCaCertData()))
+
+ .withClientKeyFile(url.getParameter(KubernetesClientConst.CLIENT_KEY_FILE,
+ base.getClientKeyFile()))
+ .withClientKeyData(url.getParameter(KubernetesClientConst.CLIENT_KEY_DATA,
+ base.getClientKeyData()))
+
+ .withClientCertFile(url.getParameter(KubernetesClientConst.CLIENT_CERT_FILE,
+ base.getClientCertFile()))
+ .withClientCertData(url.getParameter(KubernetesClientConst.CLIENT_CERT_DATA,
+ base.getClientCertData()))
+
+ .withClientKeyAlgo(url.getParameter(KubernetesClientConst.CLIENT_KEY_ALGO,
+ base.getClientKeyAlgo()))
+ .withClientKeyPassphrase(url.getParameter(KubernetesClientConst.CLIENT_KEY_PASSPHRASE,
+ base.getClientKeyPassphrase()))
+
+ .withConnectionTimeout(url.getParameter(KubernetesClientConst.CONNECTION_TIMEOUT,
+ base.getConnectionTimeout()))
+ .withRequestTimeout(url.getParameter(KubernetesClientConst.REQUEST_TIMEOUT,
+ base.getRequestTimeout()))
+ .withRollingTimeout(url.getParameter(KubernetesClientConst.ROLLING_TIMEOUT,
+ base.getRollingTimeout()))
+
+ .withWatchReconnectInterval(url.getParameter(KubernetesClientConst.WATCH_RECONNECT_INTERVAL,
+ base.getWatchReconnectInterval()))
+ .withWatchReconnectLimit(url.getParameter(KubernetesClientConst.WATCH_RECONNECT_LIMIT,
+ base.getWatchReconnectLimit()))
+ .withLoggingInterval(url.getParameter(KubernetesClientConst.LOGGING_INTERVAL,
+ base.getLoggingInterval()))
+
+ .withTrustCerts(url.getParameter(KubernetesClientConst.TRUST_CERTS,
+ base.isTrustCerts()))
+ .withHttp2Disable(url.getParameter(KubernetesClientConst.HTTP2_DISABLE,
+ base.isTrustCerts()))
+
+ .withHttpProxy(url.getParameter(KubernetesClientConst.HTTP_PROXY,
+ base.getHttpProxy()))
+ .withHttpsProxy(url.getParameter(KubernetesClientConst.HTTPS_PROXY,
+ base.getHttpsProxy()))
+ .withProxyUsername(url.getParameter(KubernetesClientConst.PROXY_USERNAME,
+ base.getProxyUsername()))
+ .withProxyPassword(url.getParameter(KubernetesClientConst.PROXY_PASSWORD,
+ base.getProxyPassword()))
+ .withNoProxy(url.getParameter(KubernetesClientConst.NO_PROXY,
+ base.getNoProxy()))
+ .build();
+ }
+
+ private static String buildMasterUrl(URL url) {
+ return (url.getParameter(KubernetesClientConst.USE_HTTPS, true) ?
+ "https://" : "http://" )
+ + url.getHost() + ":" + url.getPort();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
new file mode 100644
index 0000000..3e1b88e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -0,0 +1 @@
+kubernetes=org.apache.dubbo.registry.kubernetes.KubernetesServiceDiscovery
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..4301ab8
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+kubernetes=org.apache.dubbo.registry.kubernetes.KubernetesServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
new file mode 100644
index 0000000..d3d8c9b
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
+
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.EndpointsBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+@ExtendWith({MockitoExtension.class})
+public class KubernetesServiceDiscoveryTest {
+ public KubernetesServer mockServer = new KubernetesServer(true, true);
+
+ private KubernetesClient mockClient;
+
+ private ServiceInstancesChangedListener mockListener = Mockito.mock(ServiceInstancesChangedListener.class);
+
+ private URL serverUrl;
+
+ private Map<String, String> selector;
+
+ @BeforeEach
+ public void setUp() {
+ mockServer.before();
+ mockClient = mockServer.getClient();
+
+ serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
+ .setProtocol("kubernetes")
+ .addParameter(KubernetesClientConst.TRUST_CERTS, "true")
+ .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
+
+ System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
+ System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false");
+
+ selector = new HashMap<>(4);
+ selector.put("l", "v");
+ Pod pod = new PodBuilder()
+ .withNewMetadata().withName("TestServer").withLabels(selector).endMetadata()
+ .build();
+
+ Service service = new ServiceBuilder()
+ .withNewMetadata().withName("TestService").endMetadata()
+ .withNewSpec().withSelector(selector).endSpec().build();
+
+ Endpoints endPoints = new EndpointsBuilder()
+ .withNewMetadata().withName("TestService").endMetadata()
+ .addNewSubset()
+ .addNewAddress().withIp("ip1")
+ .withNewTargetRef().withUid("uid1").withName("TestServer").endTargetRef().endAddress()
+ .addNewPort("Test", "Test", 12345, "TCP").endSubset()
+ .build();
+
+ mockClient.pods().create(pod);
+ mockClient.services().create(service);
+ mockClient.endpoints().create(endPoints);
+ }
+
+ @AfterEach
+ public void destroy() {
+ mockServer.after();
+ }
+
+ @Test
+ public void testEndpointsUpdate() throws Exception {
+
+ KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
+ serviceDiscovery.initialize(serverUrl);
+
+ serviceDiscovery.setCurrentHostname("TestServer");
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345);
+ serviceDiscovery.register(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add("TestService");
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+ mockClient.endpoints().withName("TestService").edit().editFirstSubset()
+ .addNewAddress().withIp("ip2")
+ .withNewTargetRef().withUid("uid2").withName("TestServer").endTargetRef().endAddress()
+ .addNewPort("Test", "Test", 12345, "TCP").endSubset()
+ .done();
+
+ Thread.sleep(5000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(2, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.unregister(serviceInstance);
+
+ serviceDiscovery.destroy();
+ }
+
+ @Test
+ public void testPodsUpdate() throws Exception {
+
+ KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
+ serviceDiscovery.initialize(serverUrl);
+
+ serviceDiscovery.setCurrentHostname("TestServer");
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345);
+ serviceDiscovery.register(serviceInstance);
+
+ HashSet<String> serviceList = new HashSet<>(4);
+ serviceList.add("TestService");
+ Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+ Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+ serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+ serviceInstance = new DefaultServiceInstance("TestService", "Test12345", 12345);
+ serviceDiscovery.update(serviceInstance);
+
+ Thread.sleep(5000);
+ ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+ ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Mockito.verify(mockListener).onEvent(eventArgumentCaptor.capture());
+ Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+ serviceDiscovery.unregister(serviceInstance);
+
+ serviceDiscovery.destroy();
+ }
+
+ @Test
+ public void testGetInstance() throws Exception {
+ KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
+ serviceDiscovery.initialize(serverUrl);
+
+ serviceDiscovery.setCurrentHostname("TestServer");
+ serviceDiscovery.setKubernetesClient(mockClient);
+
+ ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345);
+ serviceDiscovery.register(serviceInstance);
+
+ serviceDiscovery.update(serviceInstance);
+
+ Assertions.assertEquals(1, serviceDiscovery.getServices().size());
+ Assertions.assertEquals(1, serviceDiscovery.getInstances("TestService").size());
+
+ Assertions.assertEquals(serviceInstance, serviceDiscovery.getLocalInstance());
+
+ serviceDiscovery.unregister(serviceInstance);
+
+ serviceDiscovery.destroy();
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/dubbo-registry/dubbo-registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..ca6ee9c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml
index 5ab688a..92f691b 100644
--- a/dubbo-registry/pom.xml
+++ b/dubbo-registry/pom.xml
@@ -34,6 +34,7 @@
<!-- <module>dubbo-registry-default</module>-->
<!-- <module>dubbo-registry-multicast</module>-->
<module>dubbo-registry-zookeeper</module>
+ <module>dubbo-registry-kubernetes</module>
<!-- <module>dubbo-registry-redis</module>-->
<!-- <module>dubbo-registry-consul</module>-->
<!-- <module>dubbo-registry-etcd3</module>-->