You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/10 02:57:58 UTC

[dubbo] branch 3.0-k8s updated: add Kubernetes registry support (#6556)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0-k8s
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0-k8s by this push:
     new 1583bc6  add Kubernetes registry support (#6556)
1583bc6 is described below

commit 1583bc63d02276b005bc2a11b72189540c3cf73b
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Mon Aug 10 10:57:38 2020 +0800

    add Kubernetes registry support (#6556)
---
 dubbo-all/pom.xml                                  |   8 +
 dubbo-dependencies-bom/pom.xml                     |  14 +
 dubbo-registry/dubbo-registry-kubernetes/pom.xml   |  73 ++++
 .../kubernetes/KubernetesServiceDiscovery.java     | 390 +++++++++++++++++++++
 .../KubernetesServiceDiscoveryFactory.java         |  28 ++
 .../kubernetes/util/KubernetesClientConst.java     |  76 ++++
 .../kubernetes/util/KubernetesConfigUtils.java     | 102 ++++++
 ...g.apache.dubbo.registry.client.ServiceDiscovery |   1 +
 ...e.dubbo.registry.client.ServiceDiscoveryFactory |   1 +
 .../kubernetes/KubernetesServiceDiscoveryTest.java | 191 ++++++++++
 .../org.mockito.plugins.MockMaker                  |   1 +
 dubbo-registry/pom.xml                             |   1 +
 12 files changed, 886 insertions(+)

diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml
index 050ae24..2adfe45 100644
--- a/dubbo-all/pom.xml
+++ b/dubbo-all/pom.xml
@@ -316,6 +316,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-kubernetes</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
             <artifactId>dubbo-monitor-api</artifactId>
             <version>${project.version}</version>
             <scope>compile</scope>
@@ -654,6 +661,7 @@
                                     <include>org.apache.dubbo:dubbo-registry-nacos</include>
                                     <include>org.apache.dubbo:dubbo-registry-sofa</include>
                                     <include>org.apache.dubbo:dubbo-registry-multiple</include>
+                                    <include>org.apache.dubbo:dubbo-registry-kubernetes</include>
                                     <include>org.apache.dubbo:dubbo-monitor-api</include>
                                     <include>org.apache.dubbo:dubbo-monitor-default</include>
                                     <include>org.apache.dubbo:dubbo-container-api</include>
diff --git a/dubbo-dependencies-bom/pom.xml b/dubbo-dependencies-bom/pom.xml
index 9c40c96..f256b12 100644
--- a/dubbo-dependencies-bom/pom.xml
+++ b/dubbo-dependencies-bom/pom.xml
@@ -145,6 +145,9 @@
         <!-- Eureka -->
         <eureka.version>1.9.12</eureka.version>
 
+        <!-- Fabric8 for Kubernetes -->
+        <fabric8_kubernetes_version>4.10.3</fabric8_kubernetes_version>
+
         <!-- Alibaba -->
         <alibaba_spring_context_support_version>1.0.8</alibaba_spring_context_support_version>
 
@@ -703,6 +706,17 @@
                 <artifactId>grpc-grpclb</artifactId>
                 <version>${grpc.version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.fabric8</groupId>
+                <artifactId>kubernetes-client</artifactId>
+                <version>${fabric8_kubernetes_version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.fabric8</groupId>
+                <artifactId>kubernetes-server-mock</artifactId>
+                <scope>test</scope>
+                <version>${fabric8_kubernetes_version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/dubbo-registry/dubbo-registry-kubernetes/pom.xml b/dubbo-registry/dubbo-registry-kubernetes/pom.xml
new file mode 100644
index 0000000..799e45e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.dubbo</groupId>
+        <artifactId>dubbo-registry</artifactId>
+        <version>${revision}</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>dubbo-registry-kubernetes</artifactId>
+    <packaging>jar</packaging>
+    <name>${project.artifactId}</name>
+    <description>The Kubernetes registry module of Dubbo project</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-registry-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-client</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-server-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>3.4.6</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <version>3.4.6</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
new file mode 100644
index 0000000..9b240f7
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscovery.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesConfigUtils;
+
+import com.alibaba.fastjson.JSONObject;
+import io.fabric8.kubernetes.api.model.EndpointAddress;
+import io.fabric8.kubernetes.api.model.EndpointPort;
+import io.fabric8.kubernetes.api.model.EndpointSubset;
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+public class KubernetesServiceDiscovery implements ServiceDiscovery {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private KubernetesClient kubernetesClient;
+
+    private String currentHostname;
+
+    private ServiceInstance localServiceInstance;
+
+    private URL registryURL;
+
+    private String namespace;
+
+    private boolean enableRegister;
+
+    public final static String KUBERNETES_PROPERTIES_KEY = "io.dubbo/metadata";
+
+    private final static ConcurrentHashMap<String, Watch> SERVICE_WATCHER = new ConcurrentHashMap<>(64);
+
+    private final static ConcurrentHashMap<String, Watch> PODS_WATCHER = new ConcurrentHashMap<>(64);
+
+    private final static ConcurrentHashMap<String, Watch> ENDPOINTS_WATCHER = new ConcurrentHashMap<>(64);
+
+    private final static ConcurrentHashMap<String, AtomicLong> SERVICE_UPDATE_TIME = new ConcurrentHashMap<>(64);
+
+    @Override
+    public void initialize(URL registryURL) throws Exception {
+        Config config = KubernetesConfigUtils.createKubernetesConfig(registryURL);
+        this.kubernetesClient = new DefaultKubernetesClient(config);
+        this.currentHostname = System.getenv("HOSTNAME");
+        this.registryURL = registryURL;
+        this.namespace = config.getNamespace();
+        this.enableRegister = registryURL.getParameter(KubernetesClientConst.ENABLE_REGISTER, true);
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        SERVICE_WATCHER.forEach((k, v) -> v.close());
+        SERVICE_WATCHER.clear();
+
+        PODS_WATCHER.forEach((k, v) -> v.close());
+        PODS_WATCHER.clear();
+
+        ENDPOINTS_WATCHER.forEach((k, v) -> v.close());
+        ENDPOINTS_WATCHER.clear();
+
+        kubernetesClient.close();
+    }
+
+    @Override
+    public void register(ServiceInstance serviceInstance) throws RuntimeException {
+        localServiceInstance = serviceInstance;
+
+        if (enableRegister) {
+            kubernetesClient
+                    .pods()
+                    .inNamespace(namespace)
+                    .withName(currentHostname)
+                    .edit()
+                    .editOrNewMetadata()
+                    .addToAnnotations(KUBERNETES_PROPERTIES_KEY, JSONObject.toJSONString(serviceInstance.getMetadata()))
+                    .endMetadata()
+                    .done();
+            if (logger.isInfoEnabled()) {
+                logger.info("Write Current Service Instance Metadata to Kubernetes pod. " +
+                        "Current pod name: " + currentHostname);
+            }
+        }
+    }
+
+    @Override
+    public void update(ServiceInstance serviceInstance) throws RuntimeException {
+        register(serviceInstance);
+    }
+
+    @Override
+    public void unregister(ServiceInstance serviceInstance) throws RuntimeException {
+        localServiceInstance = null;
+
+        if (enableRegister) {
+            kubernetesClient
+                    .pods()
+                    .inNamespace(namespace)
+                    .withName(currentHostname)
+                    .edit()
+                    .editOrNewMetadata()
+                    .removeFromAnnotations(KUBERNETES_PROPERTIES_KEY)
+                    .endMetadata()
+                    .done();
+            if (logger.isInfoEnabled()) {
+                logger.info("Remove Current Service Instance from Kubernetes pod. Current pod name: " + currentHostname);
+            }
+        }
+    }
+
+    @Override
+    public Set<String> getServices() {
+        return kubernetesClient
+                .services()
+                .inNamespace(namespace)
+                .list()
+                .getItems()
+                .stream()
+                .map(service -> service.getMetadata().getName())
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public ServiceInstance getLocalInstance() {
+        return localServiceInstance;
+    }
+
+    @Override
+    public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
+        Endpoints endpoints =
+                kubernetesClient
+                        .endpoints()
+                        .inNamespace(namespace)
+                        .withName(serviceName)
+                        .get();
+
+        return toServiceInstance(endpoints, serviceName);
+    }
+
+    @Override
+    public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
+        listener.getServiceNames().forEach(serviceName -> {
+            SERVICE_UPDATE_TIME.put(serviceName, new AtomicLong(0L));
+
+            // Watch Service Endpoint Modification
+            watchEndpoints(listener, serviceName);
+
+            // Watch Pods Modification, happens when ServiceInstance updated
+            watchPods(listener, serviceName);
+
+            // Watch Service Modification, happens when Service Selector updated, used to update pods watcher
+            watchService(listener, serviceName);
+        });
+    }
+
+    private void watchEndpoints(ServiceInstancesChangedListener listener, String serviceName) {
+        Watch watch = kubernetesClient
+                .endpoints()
+                .inNamespace(namespace)
+                .withName(serviceName)
+                .watch(new Watcher<Endpoints>() {
+                    @Override
+                    public void eventReceived(Action action, Endpoints resource) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Received Endpoint Event. Event type: " + action.name() +
+                                    ". Current pod name: " + currentHostname);
+                        }
+
+                        notifyServiceChanged(serviceName, listener);
+                    }
+
+                    @Override
+                    public void onClose(KubernetesClientException cause) {
+                        // ignore
+                    }
+                });
+
+        ENDPOINTS_WATCHER.put(serviceName, watch);
+    }
+
+    private void watchPods(ServiceInstancesChangedListener listener, String serviceName) {
+        Map<String, String> serviceSelector = getServiceSelector(serviceName);
+        if (serviceSelector == null) {
+            return;
+        }
+
+        Watch watch = kubernetesClient
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(serviceSelector)
+                .watch(new Watcher<Pod>() {
+                    @Override
+                    public void eventReceived(Action action, Pod resource) {
+                        if (Action.MODIFIED.equals(action)) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Received Pods Update Event. Current pod name: " + currentHostname);
+                            }
+
+                            notifyServiceChanged(serviceName, listener);
+                        }
+                    }
+
+                    @Override
+                    public void onClose(KubernetesClientException cause) {
+                        // ignore
+                    }
+                });
+
+        PODS_WATCHER.put(serviceName, watch);
+    }
+
+    private void watchService(ServiceInstancesChangedListener listener, String serviceName) {
+        Watch watch = kubernetesClient
+                .services()
+                .inNamespace(namespace)
+                .withName(serviceName)
+                .watch(new Watcher<Service>() {
+                    @Override
+                    public void eventReceived(Action action, Service resource) {
+                        if (Action.MODIFIED.equals(action)) {
+                            if (logger.isDebugEnabled()) {
+                                logger.debug("Received Service Update Event. Update Pods Watcher. " +
+                                        "Current pod name: " + currentHostname);
+                            }
+
+                            if (PODS_WATCHER.containsKey(serviceName)) {
+                                PODS_WATCHER.get(serviceName).close();
+                                PODS_WATCHER.remove(serviceName);
+                            }
+                            watchPods(listener, serviceName);
+                        }
+                    }
+
+                    @Override
+                    public void onClose(KubernetesClientException cause) {
+                        // ignore
+                    }
+                });
+
+        SERVICE_WATCHER.put(serviceName, watch);
+    }
+
+    private void notifyServiceChanged(String serviceName, ServiceInstancesChangedListener listener) {
+        long receivedTime = System.nanoTime();
+
+        ServiceInstancesChangedEvent event;
+
+        event = new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName));
+
+        AtomicLong updateTime = SERVICE_UPDATE_TIME.get(serviceName);
+        long lastUpdateTime = updateTime.get();
+
+        if (lastUpdateTime <= receivedTime) {
+            if (updateTime.compareAndSet(lastUpdateTime, receivedTime)) {
+                listener.onEvent(event);
+                return;
+            }
+        }
+
+        if (logger.isInfoEnabled()) {
+            logger.info("Discard Service Instance Data. " +
+                    "Possible Cause: Newer message has been processed or Failed to update time record by CAS. " +
+                    "Current Data received time: " + receivedTime + ". " +
+                    "Newer Data received time: " + lastUpdateTime + ".");
+        }
+    }
+
+    @Override
+    public URL getUrl() {
+        return registryURL;
+    }
+
+    private Map<String, String> getServiceSelector(String serviceName) {
+        Service service = kubernetesClient.services().inNamespace(namespace).withName(serviceName).get();
+        if (service == null) {
+            return null;
+        }
+        return service.getSpec().getSelector();
+    }
+
+    private List<ServiceInstance> toServiceInstance(Endpoints endpoints, String serviceName) {
+        Map<String, String> serviceSelector = getServiceSelector(serviceName);
+        if (serviceSelector == null) {
+            return new LinkedList<>();
+        }
+        Map<String, Pod> pods = kubernetesClient
+                .pods()
+                .inNamespace(namespace)
+                .withLabels(serviceSelector)
+                .list()
+                .getItems()
+                .stream()
+                .collect(
+                        Collectors.toMap(
+                                pod -> pod.getMetadata().getName(),
+                                pod -> pod));
+
+        List<ServiceInstance> instances = new LinkedList<>();
+        Set<Integer> instancePorts = new HashSet<>();
+
+        for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+            instancePorts.addAll(
+                    endpointSubset.getPorts()
+                            .stream().map(EndpointPort::getPort)
+                            .collect(Collectors.toSet()));
+        }
+
+        for (EndpointSubset endpointSubset : endpoints.getSubsets()) {
+            for (EndpointAddress address : endpointSubset.getAddresses()) {
+                Pod pod = pods.get(address.getTargetRef().getName());
+                String ip = address.getIp();
+                if (pod == null) {
+                    logger.warn("Unable to match Kubernetes Endpoint address with Pod. " +
+                            "EndpointAddress Hostname: " + address.getTargetRef().getName());
+                    continue;
+                }
+
+                instancePorts.forEach(port -> {
+                    ServiceInstance serviceInstance = new DefaultServiceInstance(serviceName, ip, port);
+
+                    String properties = pod.getMetadata().getAnnotations().get(KUBERNETES_PROPERTIES_KEY);
+                    if (StringUtils.isNotEmpty(properties)) {
+                        serviceInstance.getMetadata().putAll(JSONObject.parseObject(properties, Map.class));
+                        instances.add(serviceInstance);
+                    } else {
+                        logger.warn("Unable to find Service Instance metadata in Pod Annotations. " +
+                                "Possibly cause: provider has not been initialized successfully. " +
+                                "EndpointAddress Hostname: " + address.getTargetRef().getName());
+                    }
+                });
+            }
+        }
+
+        return instances;
+    }
+
+    /**
+     * UT used only
+     */
+    @Deprecated
+    public void setCurrentHostname(String currentHostname) {
+        this.currentHostname = currentHostname;
+    }
+
+    /**
+     * UT used only
+     */
+    @Deprecated
+    public void setKubernetesClient(KubernetesClient kubernetesClient) {
+        this.kubernetesClient = kubernetesClient;
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryFactory.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryFactory.java
new file mode 100644
index 0000000..88d50e2
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class KubernetesServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+    @Override
+    protected ServiceDiscovery createDiscovery(URL registryURL) {
+        return new KubernetesServiceDiscovery();
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesClientConst.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesClientConst.java
new file mode 100644
index 0000000..527d2fa
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesClientConst.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes.util;
+
+public class KubernetesClientConst {
+
+    public final static String ENABLE_REGISTER = "enableRegister";
+
+    public final static String TRUST_CERTS = "trustCerts";
+
+    public final static String USE_HTTPS = "useHttps";
+
+    public static final String HTTP2_DISABLE = "http2Disable";
+
+    public final static String NAMESPACE = "namespace";
+
+    public final static String API_VERSION = "apiVersion";
+
+    public final static String CA_CERT_FILE = "caCertFile";
+
+    public final static String CA_CERT_DATA = "caCertData";
+
+    public final static String CLIENT_CERT_FILE = "clientCertFile";
+
+    public final static String CLIENT_CERT_DATA = "clientCertData";
+
+    public final static String CLIENT_KEY_FILE = "clientKeyFile";
+
+    public final static String CLIENT_KEY_DATA = "clientKeyData";
+
+    public final static String CLIENT_KEY_ALGO = "clientKeyAlgo";
+
+    public final static String CLIENT_KEY_PASSPHRASE = "clientKeyPassphrase";
+
+    public final static String OAUTH_TOKEN = "oauthToken";
+
+    public final static String USERNAME = "username";
+
+    public final static String PASSWORD = "password";
+
+    public final static String WATCH_RECONNECT_INTERVAL = "watchReconnectInterval";
+
+    public final static String WATCH_RECONNECT_LIMIT = "watchReconnectLimit";
+
+    public final static String CONNECTION_TIMEOUT = "connectionTimeout";
+
+    public final static String REQUEST_TIMEOUT = "requestTimeout";
+
+    public final static String ROLLING_TIMEOUT = "rollingTimeout";
+
+    public final static String LOGGING_INTERVAL = "loggingInterval";
+
+    public final static String HTTP_PROXY = "httpProxy";
+
+    public final static String HTTPS_PROXY = "httpsProxy";
+
+    public final static String PROXY_USERNAME = "proxyUsername";
+
+    public final static String PROXY_PASSWORD = "proxyPassword";
+
+    public final static String NO_PROXY = "noProxy";
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesConfigUtils.java b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesConfigUtils.java
new file mode 100644
index 0000000..464e985
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/java/org/apache/dubbo/registry/kubernetes/util/KubernetesConfigUtils.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes.util;
+
+import org.apache.dubbo.common.URL;
+
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+
+public class KubernetesConfigUtils {
+
+    public static Config createKubernetesConfig(URL url) {
+        // Init default config
+        Config base = Config.autoConfigure(null);
+
+        // replace config with parameters if presents
+        return new ConfigBuilder(base)
+                .withMasterUrl(buildMasterUrl(url))
+                .withApiVersion(url.getParameter(KubernetesClientConst.API_VERSION,
+                base.getApiVersion()))
+                .withNamespace(url.getParameter(KubernetesClientConst.NAMESPACE,
+                        base.getNamespace()))
+                .withUsername(url.getParameter(KubernetesClientConst.USERNAME,
+                        base.getUsername()))
+                .withPassword(url.getParameter(KubernetesClientConst.PASSWORD,
+                        base.getPassword()))
+
+                .withOauthToken(url.getParameter(KubernetesClientConst.OAUTH_TOKEN,
+                        base.getOauthToken()))
+
+                .withCaCertFile(url.getParameter(KubernetesClientConst.CA_CERT_FILE,
+                        base.getCaCertFile()))
+                .withCaCertData(url.getParameter(KubernetesClientConst.CA_CERT_DATA,
+                        base.getCaCertData()))
+
+                .withClientKeyFile(url.getParameter(KubernetesClientConst.CLIENT_KEY_FILE,
+                        base.getClientKeyFile()))
+                .withClientKeyData(url.getParameter(KubernetesClientConst.CLIENT_KEY_DATA,
+                        base.getClientKeyData()))
+
+                .withClientCertFile(url.getParameter(KubernetesClientConst.CLIENT_CERT_FILE,
+                        base.getClientCertFile()))
+                .withClientCertData(url.getParameter(KubernetesClientConst.CLIENT_CERT_DATA,
+                        base.getClientCertData()))
+
+                .withClientKeyAlgo(url.getParameter(KubernetesClientConst.CLIENT_KEY_ALGO,
+                        base.getClientKeyAlgo()))
+                .withClientKeyPassphrase(url.getParameter(KubernetesClientConst.CLIENT_KEY_PASSPHRASE,
+                        base.getClientKeyPassphrase()))
+
+                .withConnectionTimeout(url.getParameter(KubernetesClientConst.CONNECTION_TIMEOUT,
+                        base.getConnectionTimeout()))
+                .withRequestTimeout(url.getParameter(KubernetesClientConst.REQUEST_TIMEOUT,
+                        base.getRequestTimeout()))
+                .withRollingTimeout(url.getParameter(KubernetesClientConst.ROLLING_TIMEOUT,
+                        base.getRollingTimeout()))
+
+                .withWatchReconnectInterval(url.getParameter(KubernetesClientConst.WATCH_RECONNECT_INTERVAL,
+                        base.getWatchReconnectInterval()))
+                .withWatchReconnectLimit(url.getParameter(KubernetesClientConst.WATCH_RECONNECT_LIMIT,
+                        base.getWatchReconnectLimit()))
+                .withLoggingInterval(url.getParameter(KubernetesClientConst.LOGGING_INTERVAL,
+                        base.getLoggingInterval()))
+
+                .withTrustCerts(url.getParameter(KubernetesClientConst.TRUST_CERTS,
+                        base.isTrustCerts()))
+                .withHttp2Disable(url.getParameter(KubernetesClientConst.HTTP2_DISABLE,
+                        base.isTrustCerts()))
+
+                .withHttpProxy(url.getParameter(KubernetesClientConst.HTTP_PROXY,
+                        base.getHttpProxy()))
+                .withHttpsProxy(url.getParameter(KubernetesClientConst.HTTPS_PROXY,
+                        base.getHttpsProxy()))
+                .withProxyUsername(url.getParameter(KubernetesClientConst.PROXY_USERNAME,
+                        base.getProxyUsername()))
+                .withProxyPassword(url.getParameter(KubernetesClientConst.PROXY_PASSWORD,
+                        base.getProxyPassword()))
+                .withNoProxy(url.getParameter(KubernetesClientConst.NO_PROXY,
+                        base.getNoProxy()))
+                .build();
+    }
+
+    private static String buildMasterUrl(URL url) {
+        return (url.getParameter(KubernetesClientConst.USE_HTTPS, true) ?
+                "https://" : "http://" )
+                + url.getHost() + ":" + url.getPort();
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
new file mode 100644
index 0000000..3e1b88e
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
@@ -0,0 +1 @@
+kubernetes=org.apache.dubbo.registry.kubernetes.KubernetesServiceDiscovery
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..4301ab8
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+kubernetes=org.apache.dubbo.registry.kubernetes.KubernetesServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java b/dubbo-registry/dubbo-registry-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
new file mode 100644
index 0000000..d3d8c9b
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/test/java/org/apache/dubbo/registry/kubernetes/KubernetesServiceDiscoveryTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.kubernetes;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst;
+
+import io.fabric8.kubernetes.api.model.Endpoints;
+import io.fabric8.kubernetes.api.model.EndpointsBuilder;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesServer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+@ExtendWith({MockitoExtension.class})
+public class KubernetesServiceDiscoveryTest {
+    public KubernetesServer mockServer = new KubernetesServer(true, true);
+
+    private KubernetesClient mockClient;
+
+    private ServiceInstancesChangedListener mockListener = Mockito.mock(ServiceInstancesChangedListener.class);
+
+    private URL serverUrl;
+
+    private Map<String, String> selector;
+
+    @BeforeEach
+    public void setUp() {
+        mockServer.before();
+        mockClient = mockServer.getClient();
+
+        serverUrl = URL.valueOf(mockClient.getConfiguration().getMasterUrl())
+                .setProtocol("kubernetes")
+                .addParameter(KubernetesClientConst.TRUST_CERTS, "true")
+                .addParameter(KubernetesClientConst.HTTP2_DISABLE, "true");
+
+        System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
+        System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false");
+
+        selector = new HashMap<>(4);
+        selector.put("l", "v");
+        Pod pod = new PodBuilder()
+                .withNewMetadata().withName("TestServer").withLabels(selector).endMetadata()
+                .build();
+
+        Service service = new ServiceBuilder()
+                .withNewMetadata().withName("TestService").endMetadata()
+                .withNewSpec().withSelector(selector).endSpec().build();
+
+        Endpoints endPoints = new EndpointsBuilder()
+                .withNewMetadata().withName("TestService").endMetadata()
+                .addNewSubset()
+                .addNewAddress().withIp("ip1")
+                .withNewTargetRef().withUid("uid1").withName("TestServer").endTargetRef().endAddress()
+                .addNewPort("Test", "Test", 12345, "TCP").endSubset()
+                .build();
+
+        mockClient.pods().create(pod);
+        mockClient.services().create(service);
+        mockClient.endpoints().create(endPoints);
+    }
+
+    @AfterEach
+    public void destroy() {
+        mockServer.after();
+    }
+
+    @Test
+    public void testEndpointsUpdate() throws Exception {
+
+        KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
+        serviceDiscovery.initialize(serverUrl);
+
+        serviceDiscovery.setCurrentHostname("TestServer");
+        serviceDiscovery.setKubernetesClient(mockClient);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345);
+        serviceDiscovery.register(serviceInstance);
+
+        HashSet<String> serviceList = new HashSet<>(4);
+        serviceList.add("TestService");
+        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+        mockClient.endpoints().withName("TestService").edit().editFirstSubset()
+                .addNewAddress().withIp("ip2")
+                .withNewTargetRef().withUid("uid2").withName("TestServer").endTargetRef().endAddress()
+                .addNewPort("Test", "Test", 12345, "TCP").endSubset()
+                .done();
+
+        Thread.sleep(5000);
+        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+        Mockito.verify(mockListener).onEvent(eventArgumentCaptor.capture());
+        Assertions.assertEquals(2, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+        serviceDiscovery.unregister(serviceInstance);
+
+        serviceDiscovery.destroy();
+    }
+
+    @Test
+    public void testPodsUpdate() throws Exception {
+
+        KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
+        serviceDiscovery.initialize(serverUrl);
+
+        serviceDiscovery.setCurrentHostname("TestServer");
+        serviceDiscovery.setKubernetesClient(mockClient);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345);
+        serviceDiscovery.register(serviceInstance);
+
+        HashSet<String> serviceList = new HashSet<>(4);
+        serviceList.add("TestService");
+        Mockito.when(mockListener.getServiceNames()).thenReturn(serviceList);
+        Mockito.doNothing().when(mockListener).onEvent(Mockito.any());
+
+        serviceDiscovery.addServiceInstancesChangedListener(mockListener);
+
+        serviceInstance = new DefaultServiceInstance("TestService", "Test12345", 12345);
+        serviceDiscovery.update(serviceInstance);
+
+        Thread.sleep(5000);
+        ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
+                ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+        Mockito.verify(mockListener).onEvent(eventArgumentCaptor.capture());
+        Assertions.assertEquals(1, eventArgumentCaptor.getValue().getServiceInstances().size());
+
+        serviceDiscovery.unregister(serviceInstance);
+
+        serviceDiscovery.destroy();
+    }
+
+    @Test
+    public void testGetInstance() throws Exception {
+        KubernetesServiceDiscovery serviceDiscovery = new KubernetesServiceDiscovery();
+        serviceDiscovery.initialize(serverUrl);
+
+        serviceDiscovery.setCurrentHostname("TestServer");
+        serviceDiscovery.setKubernetesClient(mockClient);
+
+        ServiceInstance serviceInstance = new DefaultServiceInstance("TestService", "Test", 12345);
+        serviceDiscovery.register(serviceInstance);
+
+        serviceDiscovery.update(serviceInstance);
+
+        Assertions.assertEquals(1, serviceDiscovery.getServices().size());
+        Assertions.assertEquals(1, serviceDiscovery.getInstances("TestService").size());
+
+        Assertions.assertEquals(serviceInstance, serviceDiscovery.getLocalInstance());
+
+        serviceDiscovery.unregister(serviceInstance);
+
+        serviceDiscovery.destroy();
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/dubbo-registry/dubbo-registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..ca6ee9c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml
index 5ab688a..92f691b 100644
--- a/dubbo-registry/pom.xml
+++ b/dubbo-registry/pom.xml
@@ -34,6 +34,7 @@
         <!--        <module>dubbo-registry-default</module>-->
         <!--        <module>dubbo-registry-multicast</module>-->
         <module>dubbo-registry-zookeeper</module>
+        <module>dubbo-registry-kubernetes</module>
         <!--        <module>dubbo-registry-redis</module>-->
         <!--        <module>dubbo-registry-consul</module>-->
         <!--        <module>dubbo-registry-etcd3</module>-->