You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/08/13 12:31:25 UTC

[dubbo] 05/13: consul service discovery complement

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 705ee975717e5e5dacfc9f8db8c02abb74e66e14
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Aug 12 17:25:31 2019 +0800

    consul service discovery complement
---
 .../registry/consul/ConsulServiceDiscovery.java    | 28 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 5 deletions(-)

diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
index ce7efc6..503823a 100644
--- a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
@@ -21,10 +21,12 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.event.EventListener;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
 
 import com.ecwid.consul.v1.ConsulClient;
 import com.ecwid.consul.v1.QueryParams;
@@ -36,6 +38,7 @@ import com.ecwid.consul.v1.health.model.HealthService;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -67,6 +70,7 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
             new NamedThreadFactory("dubbo-consul-notifier", true));
     private TtlScheduler ttlScheduler;
     private long checkPassInterval;
+    private URL url;
 
     public ConsulServiceDiscovery(URL url) {
         String host = url.getHost();
@@ -74,6 +78,7 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
         checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
         client = new ConsulClient(host, port);
         ttlScheduler = new TtlScheduler(checkPassInterval, client);
+        this.url = url;
     }
 
     @Override
@@ -122,9 +127,23 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
 
     @Override
     public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
-        List<ServiceInstance> instances;
-        Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout(url));
-        urls = convert(response.getValue(), url);
+        Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout());
+        return convert(response.getValue());
+    }
+
+    private List<ServiceInstance> convert(List<HealthService> services) {
+        return services.stream()
+                .map(HealthService::getService)
+                .filter(service -> Objects.nonNull(service) && service.getMeta().containsKey(ServiceInstanceMetadataUtils.METADATA_SERVICE_URL_PARAMS_KEY))
+                .map(service -> {
+                    ServiceInstance instance = new DefaultServiceInstance(
+                            service.getService(),
+                            service.getAddress(),
+                            service.getPort());
+                    instance.getMetadata().putAll(service.getMeta());
+                    return instance;
+                })
+                .collect(Collectors.toList());
     }
 
     private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) {
@@ -170,11 +189,10 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
         return check;
     }
 
-    private int buildWatchTimeout(URL url) {
+    private int buildWatchTimeout() {
         return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000;
     }
 
-
     private class ConsulNotifier implements Runnable {
         private ServiceInstance serviceInstance;
         private long consulIndex;