You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/08/13 12:31:25 UTC
[dubbo] 05/13: consul service discovery complement
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 705ee975717e5e5dacfc9f8db8c02abb74e66e14
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Aug 12 17:25:31 2019 +0800
consul service discovery complement
---
.../registry/consul/ConsulServiceDiscovery.java | 28 ++++++++++++++++++----
1 file changed, 23 insertions(+), 5 deletions(-)
diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
index ce7efc6..503823a 100644
--- a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
@@ -21,10 +21,12 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.event.EventListener;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.QueryParams;
@@ -36,6 +38,7 @@ import com.ecwid.consul.v1.health.model.HealthService;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -67,6 +70,7 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
new NamedThreadFactory("dubbo-consul-notifier", true));
private TtlScheduler ttlScheduler;
private long checkPassInterval;
+ private URL url;
public ConsulServiceDiscovery(URL url) {
String host = url.getHost();
@@ -74,6 +78,7 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL);
client = new ConsulClient(host, port);
ttlScheduler = new TtlScheduler(checkPassInterval, client);
+ this.url = url;
}
@Override
@@ -122,9 +127,23 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
- List<ServiceInstance> instances;
- Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout(url));
- urls = convert(response.getValue(), url);
+ Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout());
+ return convert(response.getValue());
+ }
+
+ private List<ServiceInstance> convert(List<HealthService> services) {
+ return services.stream()
+ .map(HealthService::getService)
+ .filter(service -> Objects.nonNull(service) && service.getMeta().containsKey(ServiceInstanceMetadataUtils.METADATA_SERVICE_URL_PARAMS_KEY))
+ .map(service -> {
+ ServiceInstance instance = new DefaultServiceInstance(
+ service.getService(),
+ service.getAddress(),
+ service.getPort());
+ instance.getMetadata().putAll(service.getMeta());
+ return instance;
+ })
+ .collect(Collectors.toList());
}
private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) {
@@ -170,11 +189,10 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
return check;
}
- private int buildWatchTimeout(URL url) {
+ private int buildWatchTimeout() {
return url.getParameter(WATCH_TIMEOUT, DEFAULT_WATCH_TIMEOUT) / 1000;
}
-
private class ConsulNotifier implements Runnable {
private ServiceInstance serviceInstance;
private long consulIndex;