You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2019/06/27 02:09:33 UTC
[dubbo] branch 2.6.x updated: Nacos Dubbo Registry can't sense the
change (#4392)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 87a2f07 Nacos Dubbo Registry can't sense the change (#4392)
87a2f07 is described below
commit 87a2f075422037c541a1263960a9efcd41fd9fc0
Author: Mercy Ma <me...@gmail.com>
AuthorDate: Thu Jun 27 10:09:24 2019 +0800
Nacos Dubbo Registry can't sense the change (#4392)
fixes #4348 :
---
.../dubbo/registry/support/DubboRegistration.java | 93 ----
.../dubbo/registry/support/Registration.java | 53 ---
.../registry/support/ServiceInstanceRegistry.java | 471 ---------------------
.../dubbo/registry/nacos/NacosRegistry.java | 434 ++++++++++++++++---
4 files changed, 377 insertions(+), 674 deletions(-)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/DubboRegistration.java b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/DubboRegistration.java
deleted file mode 100644
index 50deb0a..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/DubboRegistration.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.dubbo.registry.support;
-
-import java.util.Map;
-
-/**
- * Dubbo Registration
- *
- * @since 2.6.6
- */
-class DubboRegistration implements Registration {
-
- private String serviceName;
-
- private String ip;
-
- private int port;
-
- private Map<String, String> metadata;
-
- @Override
- public String getServiceName() {
- return serviceName;
- }
-
- public void setServiceName(String serviceName) {
- this.serviceName = serviceName;
- }
-
- @Override
- public String getIp() {
- return ip;
- }
-
- public void setIp(String ip) {
- this.ip = ip;
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- @Override
- public Map<String, String> getMetadata() {
- return metadata;
- }
-
- public void setMetadata(Map<String, String> metadata) {
- this.metadata = metadata;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- DubboRegistration that = (DubboRegistration) o;
-
- if (port != that.port) return false;
- if (serviceName != null ? !serviceName.equals(that.serviceName) : that.serviceName != null) return false;
- if (ip != null ? !ip.equals(that.ip) : that.ip != null) return false;
- return metadata != null ? metadata.equals(that.metadata) : that.metadata == null;
- }
-
- @Override
- public int hashCode() {
- int result = serviceName != null ? serviceName.hashCode() : 0;
- result = 31 * result + (ip != null ? ip.hashCode() : 0);
- result = 31 * result + port;
- result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
- return result;
- }
-}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/Registration.java b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/Registration.java
deleted file mode 100644
index 143cc9f..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/Registration.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.dubbo.registry.support;
-
-import java.util.Map;
-
-/**
- * The Registration
- *
- * @since 2.6.6
- */
-public interface Registration {
-
- /**
- * @return The service name
- */
- String getServiceName();
-
- /**
- * @return The IP address
- */
- String getIp();
-
- /**
- * @return The service port
- */
- int getPort();
-
- /**
- * @return The read-only metadata
- */
- Map<String, String> getMetadata();
-
- @Override
- boolean equals(Object o);
-
- @Override
- int hashCode();
-}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/ServiceInstanceRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/ServiceInstanceRegistry.java
deleted file mode 100644
index dd19604..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/support/ServiceInstanceRegistry.java
+++ /dev/null
@@ -1,471 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.dubbo.registry.support;
-
-import com.alibaba.dubbo.common.Constants;
-import com.alibaba.dubbo.common.URL;
-import com.alibaba.dubbo.common.logger.Logger;
-import com.alibaba.dubbo.common.logger.LoggerFactory;
-import com.alibaba.dubbo.common.utils.StringUtils;
-import com.alibaba.dubbo.common.utils.UrlUtils;
-import com.alibaba.dubbo.registry.NotifyListener;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static com.alibaba.dubbo.common.Constants.CONFIGURATORS_CATEGORY;
-import static com.alibaba.dubbo.common.Constants.CONSUMERS_CATEGORY;
-import static com.alibaba.dubbo.common.Constants.PROVIDERS_CATEGORY;
-import static com.alibaba.dubbo.common.Constants.ROUTERS_CATEGORY;
-import static java.lang.Long.getLong;
-import static java.lang.System.getProperty;
-
-/**
- * {@link FailbackRegistry} extension that is used as the Oriented Service Instance registration, for
- *
- * @param <S> The actual type of service instance
- * @since 2.6.6
- */
-public abstract class ServiceInstanceRegistry<S> extends FailbackRegistry {
-
- /**
- * All supported categories
- */
- private static final String[] ALL_SUPPORTED_CATEGORIES = of(
- PROVIDERS_CATEGORY,
- CONSUMERS_CATEGORY,
- ROUTERS_CATEGORY,
- CONFIGURATORS_CATEGORY
- );
-
- private static final int CATEGORY_INDEX = 0;
-
- private static final int SERVICE_INTERFACE_INDEX = CATEGORY_INDEX + 1;
-
- private static final int SERVICE_VERSION_INDEX = SERVICE_INTERFACE_INDEX + 1;
-
- private static final int SERVICE_GROUP_INDEX = SERVICE_VERSION_INDEX + 1;
-
- private static final String WILDCARD = "*";
-
- /**
- * The separator for service name
- */
- private static final String SERVICE_NAME_SEPARATOR = getProperty("dubbo.service.name.separator", ":");
-
- /**
- * The interval in second of lookup service names(only for Dubbo-OPS)
- */
- private static final long LOOKUP_INTERVAL = getLong("dubbo.service.names.lookup.interval", 30);
-
- protected final Logger logger = LoggerFactory.getLogger(getClass());
-
- /**
- * {@link ScheduledExecutorService} lookup service names(only for Dubbo-OPS)
- */
- private volatile ScheduledExecutorService serviceNamesScheduler;
-
- public ServiceInstanceRegistry(URL url) {
- super(url);
- }
-
- @Override
- protected final void doRegister(URL url) {
- String serviceName = getServiceName(url);
- Registration registration = createRegistration(serviceName, url);
- register(serviceName, toServiceInstance(registration), url);
- }
-
- @Override
- protected final void doUnregister(URL url) {
- String serviceName = getServiceName(url);
- Registration registration = createRegistration(serviceName, url);
- deregister(serviceName, toServiceInstance(registration), url);
- }
-
- @Override
- protected final void doSubscribe(URL url, NotifyListener listener) {
- Set<String> serviceNames = getServiceNames(url, listener);
- doSubscribe(url, listener, serviceNames);
- }
-
- @Override
- protected void doUnsubscribe(URL url, NotifyListener listener) {
- if (isAdminProtocol(url)) {
- shutdownServiceNamesLookup();
- }
- }
-
- /**
- * Adapts {@link Registration} to an actual service instance
- *
- * @param registration {@link Registration}
- * @return
- */
- protected abstract S toServiceInstance(Registration registration);
-
- /**
- * Adapts {@link S} to an {@link Registration}
- *
- * @param serviceInstance {@link S}
- * @return an {@link Registration}
- */
- protected abstract Registration toRegistration(S serviceInstance);
-
- /**
- * Register a {@link S service instance}
- *
- * @param serviceName the service name
- * @param serviceInstance {@link S service instance}
- * @param url Dubbo's {@link URL}
- */
- protected abstract void register(String serviceName, S serviceInstance, URL url);
-
- /**
- * Deregister a {@link DubboRegistration Dubbol registration}
- *
- * @param serviceName the service name
- * @param serviceInstance {@link S service instance}
- * @param url Dubbo's {@link URL}
- */
- protected abstract void deregister(String serviceName, S serviceInstance, URL url);
-
- private void doSubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
- Collection<S> serviceInstances = new LinkedList<S>();
-
- for (String serviceName : serviceNames) {
- serviceInstances.addAll(findServiceInstances(serviceName));
- }
- notifySubscriber(url, listener, serviceInstances);
- }
-
- /**
- * Notify the Healthy {@link DubboRegistration service instance} to subscriber.
- *
- * @param url {@link URL}
- * @param listener {@link NotifyListener}
- * @param serviceInstances all {@link S registrations}
- */
- private void notifySubscriber(URL url, NotifyListener listener, Collection<S> serviceInstances) {
- Set<S> healthyServiceInstances = new LinkedHashSet<S>(serviceInstances);
- // Healthy Instances
- filterHealthyInstances(healthyServiceInstances);
- List<URL> urls = buildURLs(url, healthyServiceInstances);
- this.notify(url, listener, urls);
- }
-
- private void filterHealthyInstances(Collection<S> serviceInstances) {
- filter(serviceInstances, new Filter<S>() {
- @Override
- public boolean accept(S serviceInstance) {
- return filterHealthyRegistration(serviceInstance);
- }
- });
- }
-
- /**
- * Find the {@link Collection} of {@link S service instances} by the service name
- *
- * @param serviceName the service name
- * @return a {@link Collection} of {@link S service instances}
- */
- protected abstract Collection<S> findServiceInstances(String serviceName);
-
- /**
- * Filter Healthy the {@link S service instance}
- *
- * @param serviceInstance the {@link S service instance}
- * @return if healthy , return <code>true</code>
- */
- protected abstract boolean filterHealthyRegistration(S serviceInstance);
-
- private void shutdownServiceNamesLookup() {
- if (serviceNamesScheduler != null) {
- serviceNamesScheduler.shutdown();
- }
- }
-
- private void scheduleServiceNamesLookup(final URL url,
- final NotifyListener listener) {
- if (serviceNamesScheduler == null) {
- serviceNamesScheduler = Executors.newSingleThreadScheduledExecutor();
- serviceNamesScheduler.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- Set<String> serviceNames = findAllServiceNames();
- filter(serviceNames, new Filter<String>() {
- @Override
- public boolean accept(String serviceName) {
- boolean accepted = false;
- for (String category : ALL_SUPPORTED_CATEGORIES) {
- String prefix = category + SERVICE_NAME_SEPARATOR;
- if (serviceName.startsWith(prefix)) {
- accepted = true;
- break;
- }
- }
- return accepted;
- }
- });
- doSubscribe(url, listener, serviceNames);
- }
- }, LOOKUP_INTERVAL, LOOKUP_INTERVAL, TimeUnit.SECONDS);
- }
- }
-
- /**
- * Find all service names
- *
- * @return all service names
- */
- protected abstract Set<String> findAllServiceNames();
-
- private List<URL> buildURLs(URL consumerURL, Collection<S> serviceInstances) {
- if (serviceInstances.isEmpty()) {
- return Collections.emptyList();
- }
- List<URL> urls = new LinkedList<URL>();
- for (S serviceInstance : serviceInstances) {
- Registration registration = toRegistration(serviceInstance);
- URL url = buildURL(registration);
- if (UrlUtils.isMatch(consumerURL, url)) {
- urls.add(url);
- }
- }
- return urls;
- }
-
- private URL buildURL(Registration registration) {
- URL url = new URL(registration.getMetadata().get(Constants.PROTOCOL_KEY),
- registration.getIp(), registration.getPort(),
- registration.getMetadata());
- return url;
- }
-
- /**
- * Get the service names for Dubbo OPS
- *
- * @param url {@link URL}
- * @return non-null
- */
- private Set<String> getSubscribedServiceNamesForOps(URL url) {
- Set<String> serviceNames = findAllServiceNames();
- filterServiceNames(serviceNames, url);
- return serviceNames;
- }
-
- private <T> void filter(Collection<T> collection, Filter<T> filter) {
- Iterator<T> iterator = collection.iterator();
- while (iterator.hasNext()) {
- T data = iterator.next();
- if (!filter.accept(data)) { // remove if not accept
- iterator.remove();
- }
- }
- }
-
- private void filterServiceNames(Set<String> serviceNames, URL url) {
-
- final String[] categories = getCategories(url);
-
- final String targetServiceInterface = url.getServiceInterface();
-
- final String targetVersion = url.getParameter(Constants.VERSION_KEY);
-
- final String targetGroup = url.getParameter(Constants.GROUP_KEY);
-
- filter(serviceNames, new Filter<String>() {
- @Override
- public boolean accept(String serviceName) {
- // split service name to segments
- // (required) segments[0] = category
- // (required) segments[1] = serviceInterface
- // (required) segments[2] = version
- // (optional) segments[3] = group
- String[] segments = getServiceSegments(serviceName);
- int length = segments.length;
- if (length < 4) { // must present 4 segments or more
- return false;
- }
-
- String category = getCategory(segments);
- if (Arrays.binarySearch(categories, category) > -1) { // no match category
- return false;
- }
-
- String serviceInterface = getServiceInterface(segments);
- if (!WILDCARD.equals(targetServiceInterface)
- && !StringUtils.isEquals(targetServiceInterface, serviceInterface)) { // no match interface
- return false;
- }
-
- String version = getServiceVersion(segments);
- if (!WILDCARD.equals(targetVersion)
- && !StringUtils.isEquals(targetVersion, version)) { // no match service
- // version
- return false;
- }
-
- String group = getServiceGroup(segments);
- if (group != null && !WILDCARD.equals(targetGroup)
- && !StringUtils.isEquals(targetGroup, group)) { // no match service
- // group
- return false;
- }
-
- return true;
- }
- });
- }
-
- protected Registration createRegistration(String serviceName, URL url) {
- // Append default category if absent
- String category = url.getParameter(Constants.CATEGORY_KEY,
- Constants.DEFAULT_CATEGORY);
- URL newURL = url.addParameter(Constants.CATEGORY_KEY, category);
- newURL = newURL.addParameter(Constants.PROTOCOL_KEY, url.getProtocol());
- String ip = url.getHost();
- int port = url.getPort();
- DubboRegistration registration = new DubboRegistration();
- registration.setServiceName(serviceName);
- registration.setIp(ip);
- registration.setPort(port);
- registration.setMetadata(new LinkedHashMap<String, String>(newURL.getParameters()));
-
- return registration;
- }
-
- /**
- * Get the categories from {@link URL}
- *
- * @param url {@link URL}
- * @return non-null array
- */
- private String[] getCategories(URL url) {
- return Constants.ANY_VALUE.equals(url.getServiceInterface())
- ? ALL_SUPPORTED_CATEGORIES
- : of(Constants.DEFAULT_CATEGORY);
- }
-
- /**
- * A filter
- */
- private interface Filter<T> {
-
- /**
- * Tests whether or not the specified data should be accepted.
- *
- * @param data The data to be tested
- * @return <code>true</code> if and only if <code>data</code> should be accepted
- */
- boolean accept(T data);
-
- }
-
- /**
- * Get the subscribed service names from the specified {@link URL url}
- *
- * @param url {@link URL}
- * @param listener {@link NotifyListener}
- * @return non-null
- */
- private Set<String> getServiceNames(URL url, NotifyListener listener) {
- if (isAdminProtocol(url)) {
- scheduleServiceNamesLookup(url, listener);
- return getSubscribedServiceNamesForOps(url);
- } else {
- return getServiceNames(url);
- }
- }
-
- private Set<String> getServiceNames(URL url) {
- String[] categories = getCategories(url);
- Set<String> serviceNames = new LinkedHashSet<String>(categories.length);
- for (String category : categories) {
- final String serviceName = getServiceName(url, category);
- serviceNames.add(serviceName);
- }
- return serviceNames;
- }
-
- private boolean isAdminProtocol(URL url) {
- return Constants.ADMIN_PROTOCOL.equals(url.getProtocol());
- }
-
- /**
- * Get the service name
- *
- * @param url {@link URL}
- * @return non-null
- */
- public static String getServiceName(URL url) {
- String category = url.getParameter(Constants.CATEGORY_KEY,
- Constants.DEFAULT_CATEGORY);
- return getServiceName(url, category);
- }
-
- private static String getServiceName(URL url, String category) {
- StringBuilder serviceNameBuilder = new StringBuilder(category);
- append(serviceNameBuilder, url, Constants.INTERFACE_KEY);
- append(serviceNameBuilder, url, Constants.VERSION_KEY);
- append(serviceNameBuilder, url, Constants.GROUP_KEY);
- return serviceNameBuilder.toString();
- }
-
- private static void append(StringBuilder target, URL url,
- String parameterName) {
- target.append(SERVICE_NAME_SEPARATOR);
- String parameterValue = url.getParameter(parameterName);
- if (StringUtils.isNotEmpty(parameterValue)) {
- target.append(parameterValue);
- }
- }
-
- public static String[] getServiceSegments(String serviceName) {
- return serviceName.split(SERVICE_NAME_SEPARATOR);
- }
-
- public static String getCategory(String[] segments) {
- return segments[CATEGORY_INDEX];
- }
-
- public static String getServiceInterface(String[] segments) {
- return segments[SERVICE_INTERFACE_INDEX];
- }
-
- public static String getServiceVersion(String[] segments) {
- return segments[SERVICE_VERSION_INDEX];
- }
-
- public static String getServiceGroup(String[] segments) {
- return segments.length > 4 ? segments[SERVICE_GROUP_INDEX] : null;
- }
-
- private static <T> T[] of(T... values) {
- return values;
- }
-}
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/com/alibaba/dubbo/registry/nacos/NacosRegistry.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/com/alibaba/dubbo/registry/nacos/NacosRegistry.java
index 621e7b4..20ce09a 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/com/alibaba/dubbo/registry/nacos/NacosRegistry.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/com/alibaba/dubbo/registry/nacos/NacosRegistry.java
@@ -16,37 +16,98 @@
*/
package com.alibaba.dubbo.registry.nacos;
+import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
+import com.alibaba.dubbo.common.utils.NetUtils;
+import com.alibaba.dubbo.common.utils.UrlUtils;
+import com.alibaba.dubbo.registry.NotifyListener;
import com.alibaba.dubbo.registry.Registry;
-import com.alibaba.dubbo.registry.support.Registration;
-import com.alibaba.dubbo.registry.support.ServiceInstanceRegistry;
+import com.alibaba.dubbo.registry.support.FailbackRegistry;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
+import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
+import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ListView;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static com.alibaba.dubbo.common.Constants.CONFIGURATORS_CATEGORY;
+import static com.alibaba.dubbo.common.Constants.CONSUMERS_CATEGORY;
+import static com.alibaba.dubbo.common.Constants.PROVIDERS_CATEGORY;
+import static com.alibaba.dubbo.common.Constants.ROUTERS_CATEGORY;
/**
* Nacos {@link Registry}
*
- * @since 2.6.6
+ * @see #SERVICE_NAME_SEPARATOR
+ * @see #PAGINATION_SIZE
+ * @see #LOOKUP_INTERVAL
+ * @since 2.6.5
*/
-public class NacosRegistry extends ServiceInstanceRegistry<Instance> {
+public class NacosRegistry extends FailbackRegistry {
+
+ /**
+ * All supported categories
+ */
+ private static final String[] ALL_SUPPORTED_CATEGORIES = of(
+ PROVIDERS_CATEGORY,
+ CONSUMERS_CATEGORY,
+ ROUTERS_CATEGORY,
+ CONFIGURATORS_CATEGORY
+ );
+
+ private static final int CATEGORY_INDEX = 0;
+
+ private static final int SERVICE_INTERFACE_INDEX = 1;
+
+ private static final int SERVICE_VERSION_INDEX = 2;
+
+ private static final int SERVICE_GROUP_INDEX = 3;
+
+ private static final String WILDCARD = "*";
+
+ /**
+ * The separator for service name
+ *
+ * @revert change a constant to be configurable, it's designed for Windows file name that is compatible with old
+ * Nacos binary release(< 0.6.1)
+ */
+ private static final String SERVICE_NAME_SEPARATOR = System.getProperty("nacos.service.name.separator", ":");
/**
* The pagination size of query for Nacos service names(only for Dubbo-OPS)
*/
private static final int PAGINATION_SIZE = Integer.getInteger("nacos.service.names.pagination.size", 100);
+ /**
+ * The interval in second of lookup Nacos service names(only for Dubbo-OPS)
+ */
+ private static final long LOOKUP_INTERVAL = Long.getLong("nacos.service.names.lookup.interval", 30);
+
+ /**
+ * {@link ScheduledExecutorService} lookup Nacos service names(only for Dubbo-OPS)
+ */
+ private volatile ScheduledExecutorService scheduledExecutorService;
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
private final NamingService namingService;
private final ConcurrentMap<String, EventListener> nacosListeners;
@@ -57,83 +118,140 @@ public class NacosRegistry extends ServiceInstanceRegistry<Instance> {
this.nacosListeners = new ConcurrentHashMap<String, EventListener>();
}
-
@Override
- protected Instance toServiceInstance(Registration registration) {
- Instance instance = new Instance();
- instance.setServiceName(registration.getServiceName());
- instance.setIp(registration.getIp());
- instance.setPort(registration.getPort());
- instance.setMetadata(registration.getMetadata());
- return instance;
+ public boolean isAvailable() {
+ return "UP".equals(namingService.getServerStatus());
}
@Override
- protected Registration toRegistration(final Instance serviceInstance) {
- return new Registration() {
-
- @Override
- public String getServiceName() {
- return serviceInstance.getServiceName();
- }
-
- @Override
- public String getIp() {
- return serviceInstance.getIp();
- }
-
- @Override
- public int getPort() {
- return serviceInstance.getPort();
- }
-
+ public List<URL> lookup(final URL url) {
+ final List<URL> urls = new LinkedList<URL>();
+ execute(new NamingServiceCallback() {
@Override
- public Map<String, String> getMetadata() {
- return serviceInstance.getMetadata();
+ public void callback(NamingService namingService) throws NacosException {
+ List<String> serviceNames = getServiceNames(url, null);
+ for (String serviceName : serviceNames) {
+ List<Instance> instances = namingService.getAllInstances(serviceName);
+ urls.addAll(buildURLs(url, instances));
+ }
}
- };
+ });
+ return urls;
}
- @Override
- protected void register(final String serviceName, final Instance serviceInstance, URL url) {
+ protected void doRegister(URL url) {
+ final String serviceName = getServiceName(url);
+ final Instance instance = createInstance(url);
execute(new NamingServiceCallback() {
- @Override
public void callback(NamingService namingService) throws NacosException {
- namingService.registerInstance(serviceName, serviceInstance);
+ namingService.registerInstance(serviceName, instance);
}
});
}
- @Override
- protected void deregister(final String serviceName, final Instance serviceInstance, URL url) {
+ protected void doUnregister(final URL url) {
execute(new NamingServiceCallback() {
- @Override
public void callback(NamingService namingService) throws NacosException {
- namingService.deregisterInstance(serviceName, serviceInstance.getIp(), serviceInstance.getPort());
+ String serviceName = getServiceName(url);
+ Instance instance = createInstance(url);
+ namingService.deregisterInstance(serviceName, instance.getIp(), instance.getPort());
}
});
}
- @Override
- protected Collection<Instance> findServiceInstances(final String serviceName) {
- final Collection<Instance> instances = new LinkedList<Instance>();
+ protected void doSubscribe(final URL url, final NotifyListener listener) {
+ List<String> serviceNames = getServiceNames(url, listener);
+ doSubscribe(url, listener, serviceNames);
+ }
+
+ private void doSubscribe(final URL url, final NotifyListener listener, final List<String> serviceNames) {
execute(new NamingServiceCallback() {
@Override
public void callback(NamingService namingService) throws NacosException {
- instances.addAll(namingService.getAllInstances(serviceName));
+ for (String serviceName : serviceNames) {
+ List<Instance> instances = namingService.getAllInstances(serviceName);
+ notifySubscriber(url, listener, instances);
+ subscribeEventListener(serviceName, url, listener);
+ }
}
});
- return instances;
}
@Override
- protected boolean filterHealthyRegistration(Instance serviceInstance) {
- return serviceInstance.isEnabled();
+ protected void doUnsubscribe(URL url, NotifyListener listener) {
+ if (isAdminProtocol(url)) {
+ shutdownServiceNamesLookup();
+ }
}
- @Override
- protected Set<String> findAllServiceNames() {
- final Set<String> serviceNames = new LinkedHashSet<String>();
+ private void shutdownServiceNamesLookup() {
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdown();
+ }
+ }
+
+ /**
+ * Get the service names from the specified {@link URL url}
+ *
+ * @param url {@link URL}
+ * @param listener {@link NotifyListener}
+ * @return non-null
+ */
+ private List<String> getServiceNames(URL url, NotifyListener listener) {
+ if (isAdminProtocol(url)) {
+ scheduleServiceNamesLookup(url, listener);
+ return getServiceNamesForOps(url);
+ } else {
+ return doGetServiceNames(url);
+ }
+ }
+
+ private boolean isAdminProtocol(URL url) {
+ return Constants.ADMIN_PROTOCOL.equals(url.getProtocol());
+ }
+
+ private void scheduleServiceNamesLookup(final URL url, final NotifyListener listener) {
+ if (scheduledExecutorService == null) {
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ List<String> serviceNames = getAllServiceNames();
+ filterData(serviceNames, new NacosDataFilter<String>() {
+ @Override
+ public boolean accept(String serviceName) {
+ boolean accepted = false;
+ for (String category : ALL_SUPPORTED_CATEGORIES) {
+ String prefix = category + SERVICE_NAME_SEPARATOR;
+ if (StringUtils.startsWith(serviceName, prefix)) {
+ accepted = true;
+ break;
+ }
+ }
+ return accepted;
+ }
+ });
+ doSubscribe(url, listener, serviceNames);
+ }
+ }, LOOKUP_INTERVAL, LOOKUP_INTERVAL, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Get the service names for Dubbo OPS
+ *
+ * @param url {@link URL}
+ * @return non-null
+ */
+ private List<String> getServiceNamesForOps(URL url) {
+ List<String> serviceNames = getAllServiceNames();
+ filterServiceNames(serviceNames, url);
+ return serviceNames;
+ }
+
+ private List<String> getAllServiceNames() {
+
+ final List<String> serviceNames = new LinkedList<String>();
execute(new NamingServiceCallback() {
@Override
@@ -166,9 +284,177 @@ public class NacosRegistry extends ServiceInstanceRegistry<Instance> {
return serviceNames;
}
- @Override
- public boolean isAvailable() {
- return "UP".equals(namingService.getServerStatus());
+ private void filterServiceNames(List<String> serviceNames, URL url) {
+
+ final String[] categories = getCategories(url);
+
+ final String targetServiceInterface = url.getServiceInterface();
+
+ final String targetVersion = url.getParameter(Constants.VERSION_KEY);
+
+ final String targetGroup = url.getParameter(Constants.GROUP_KEY);
+
+ filterData(serviceNames, new NacosDataFilter<String>() {
+ @Override
+ public boolean accept(String serviceName) {
+ // split service name to segments
+ // (required) segments[0] = category
+ // (required) segments[1] = serviceInterface
+ // (required) segments[2] = version
+ // (optional) segments[3] = group
+ String[] segments = StringUtils.split(serviceName, SERVICE_NAME_SEPARATOR);
+ int length = segments.length;
+ if (length < 3) { // must present 3 segments or more
+ return false;
+ }
+
+ String category = segments[CATEGORY_INDEX];
+ if (!ArrayUtils.contains(categories, category)) { // no match category
+ return false;
+ }
+
+ String serviceInterface = segments[SERVICE_INTERFACE_INDEX];
+ if (!WILDCARD.equals(targetServiceInterface) &&
+ !StringUtils.equals(targetServiceInterface, serviceInterface)) { // no match service interface
+ return false;
+ }
+
+ String version = segments[SERVICE_VERSION_INDEX];
+ if (!WILDCARD.equals(targetVersion) &&
+ !StringUtils.equals(targetVersion, version)) { // no match service version
+ return false;
+ }
+
+ String group = length > 3 ? segments[SERVICE_GROUP_INDEX] : null;
+ if (group != null && !WILDCARD.equals(targetGroup)
+ && !StringUtils.equals(targetGroup, group)) { // no match service group
+ return false;
+ }
+
+ return true;
+ }
+ });
+ }
+
+ private <T> void filterData(Collection<T> collection, NacosDataFilter<T> filter) {
+ Iterator<T> iterator = collection.iterator();
+ while (iterator.hasNext()) {
+ T data = iterator.next();
+ if (!filter.accept(data)) { // remove if not accept
+ iterator.remove();
+ }
+ }
+ }
+
+ private List<String> doGetServiceNames(URL url) {
+ String[] categories = getCategories(url);
+ List<String> serviceNames = new ArrayList<String>(categories.length);
+ for (String category : categories) {
+ final String serviceName = getServiceName(url, category);
+ serviceNames.add(serviceName);
+ }
+ return serviceNames;
+ }
+
+ private List<URL> buildURLs(URL consumerURL, Collection<Instance> instances) {
+ if (instances.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<URL> urls = new LinkedList<URL>();
+ for (Instance instance : instances) {
+ URL url = buildURL(instance);
+ if (UrlUtils.isMatch(consumerURL, url)) {
+ urls.add(url);
+ }
+ }
+ return urls;
+ }
+
+ private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
+ throws NacosException {
+ if (!nacosListeners.containsKey(serviceName)) {
+ EventListener eventListener = new EventListener() {
+ public void onEvent(Event event) {
+ if (event instanceof NamingEvent) {
+ NamingEvent e = (NamingEvent) event;
+ notifySubscriber(url, listener, e.getInstances());
+ }
+ }
+ };
+ namingService.subscribe(serviceName, eventListener);
+ nacosListeners.put(serviceName, eventListener);
+ }
+ }
+
+ /**
+ * Notify the Healthy {@link Instance instances} to subscriber.
+ *
+ * @param url {@link URL}
+ * @param listener {@link NotifyListener}
+ * @param instances all {@link Instance instances}
+ */
+ private void notifySubscriber(URL url, NotifyListener listener, Collection<Instance> instances) {
+ List<Instance> healthyInstances = new LinkedList<Instance>(instances);
+ // Healthy Instances
+ filterHealthyInstances(healthyInstances);
+ List<URL> urls = buildURLs(url, healthyInstances);
+ if (logger.isInfoEnabled()) {
+ logger.info("The URLs[size : {}] are about to be notified from instances : {}", urls.size(), instances);
+ }
+ NacosRegistry.this.notify(url, listener, urls);
+ }
+
+ /**
+ * Get the categories from {@link URL}
+ *
+ * @param url {@link URL}
+ * @return non-null array
+ */
+ private String[] getCategories(URL url) {
+ return Constants.ANY_VALUE.equals(url.getServiceInterface()) ?
+ ALL_SUPPORTED_CATEGORIES : of(Constants.DEFAULT_CATEGORY);
+ }
+
+ private URL buildURL(Instance instance) {
+ URL url = new URL(instance.getMetadata().get(Constants.PROTOCOL_KEY),
+ instance.getIp(),
+ instance.getPort(),
+ instance.getMetadata());
+ return url;
+ }
+
+ private Instance createInstance(URL url) {
+ // Append default category if absent
+ String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
+ URL newURL = url.addParameter(Constants.CATEGORY_KEY, category);
+ newURL = newURL.addParameter(Constants.PROTOCOL_KEY, url.getProtocol());
+ String ip = NetUtils.getLocalHost();
+ int port = newURL.getParameter(Constants.BIND_PORT_KEY, url.getPort());
+ Instance instance = new Instance();
+ instance.setIp(ip);
+ instance.setPort(port);
+ instance.setMetadata(new HashMap<String, String>(newURL.getParameters()));
+ return instance;
+ }
+
+ private String getServiceName(URL url) {
+ String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
+ return getServiceName(url, category);
+ }
+
+ private String getServiceName(URL url, String category) {
+ StringBuilder serviceNameBuilder = new StringBuilder(category);
+ appendIfPresent(serviceNameBuilder, url, Constants.INTERFACE_KEY);
+ appendIfPresent(serviceNameBuilder, url, Constants.VERSION_KEY);
+ appendIfPresent(serviceNameBuilder, url, Constants.GROUP_KEY);
+ return serviceNameBuilder.toString();
+ }
+
+ private void appendIfPresent(StringBuilder target, URL url, String parameterName) {
+ String parameterValue = url.getParameter(parameterName);
+ if (!StringUtils.isBlank(parameterValue)) {
+ target.append(SERVICE_NAME_SEPARATOR).append(parameterValue);
+ }
}
private void execute(NamingServiceCallback callback) {
@@ -181,8 +467,42 @@ public class NacosRegistry extends ServiceInstanceRegistry<Instance> {
}
}
+ private void filterHealthyInstances(Collection<Instance> instances) {
+ filterData(instances, new NacosDataFilter<Instance>() {
+ @Override
+ public boolean accept(Instance data) {
+ return data.isEnabled();
+ }
+ });
+ }
+
+ private static <T> T[] of(T... values) {
+ return values;
+ }
+
+
+ /**
+ * A filter for Nacos data
+ *
+ * @since 2.6.5
+ */
+ private interface NacosDataFilter<T> {
+
+ /**
+ * Tests whether or not the specified data should be accepted.
+ *
+ * @param data The data to be tested
+ * @return <code>true</code> if and only if <code>data</code>
+ * should be accepted
+ */
+ boolean accept(T data);
+
+ }
+
/**
* {@link NamingService} Callback
+ *
+ * @since 2.6.5
*/
interface NamingServiceCallback {