You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/07/01 06:19:19 UTC
[dubbo] branch 3.0 updated: [3.0] Clear up env when destroy (#10193)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new b68f1147fa [3.0] Clear up env when destroy (#10193)
b68f1147fa is described below
commit b68f1147fa0986a1be035c30c52f7d466d6b25a4
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Fri Jul 1 14:19:14 2022 +0800
[3.0] Clear up env when destroy (#10193)
* [3.0] Clear up env when destroy
* fix import
* fix clean
* update clean up
---
.../org/apache/dubbo/config/AbstractConfig.java | 2 +-
.../org/apache/dubbo/config/ReferenceConfig.java | 104 ++++++++++++---------
.../client/ServiceDiscoveryRegistryDirectory.java | 9 ++
.../registry/integration/RegistryDirectory.java | 82 +++++++++-------
4 files changed, 121 insertions(+), 76 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
index 2fa74930a5..5d4ee5f1b4 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractConfig.java
@@ -647,7 +647,6 @@ public abstract class AbstractConfig implements Serializable {
* Dubbo config property override
*/
public void refresh() {
- refreshed.set(true);
try {
// check and init before do refresh
preProcessRefresh();
@@ -697,6 +696,7 @@ public abstract class AbstractConfig implements Serializable {
}
postProcessRefresh();
+ refreshed.set(true);
}
private void assignProperties(Object obj, Environment environment, Map<String, String> properties, InmemoryConfiguration configuration) {
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index f662b0ddf6..4da63749c7 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -250,47 +250,68 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
if (initialized) {
return;
}
- initialized = true;
-
- if (!this.isRefreshed()) {
- this.refresh();
- }
+ try {
+ if (!this.isRefreshed()) {
+ this.refresh();
+ }
- // init serviceMetadata
- initServiceMetadata(consumer);
+ // init serviceMetadata
+ initServiceMetadata(consumer);
- serviceMetadata.setServiceType(getServiceInterfaceClass());
- // TODO, uncomment this line once service key is unified
- serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
+ serviceMetadata.setServiceType(getServiceInterfaceClass());
+ // TODO, uncomment this line once service key is unified
+ serviceMetadata.setServiceKey(URL.buildKey(interfaceName, group, version));
- Map<String, String> referenceParameters = appendConfig();
- // init service-application mapping
- initServiceAppsMapping(referenceParameters);
+ Map<String, String> referenceParameters = appendConfig();
+ // init service-application mapping
+ initServiceAppsMapping(referenceParameters);
- ModuleServiceRepository repository = getScopeModel().getServiceRepository();
- ServiceDescriptor serviceDescriptor;
- if (CommonConstants.NATIVE_STUB.equals(getProxy())) {
- serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);
- repository.registerService(serviceDescriptor);
- } else {
- serviceDescriptor = repository.registerService(interfaceClass);
- }
- consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, this,
- getScopeModel(), serviceMetadata, createAsyncMethodInfo());
+ ModuleServiceRepository repository = getScopeModel().getServiceRepository();
+ ServiceDescriptor serviceDescriptor;
+ if (CommonConstants.NATIVE_STUB.equals(getProxy())) {
+ serviceDescriptor = StubSuppliers.getServiceDescriptor(interfaceName);
+ repository.registerService(serviceDescriptor);
+ } else {
+ serviceDescriptor = repository.registerService(interfaceClass);
+ }
+ consumerModel = new ConsumerModel(serviceMetadata.getServiceKey(), proxy, serviceDescriptor, this,
+ getScopeModel(), serviceMetadata, createAsyncMethodInfo());
- repository.registerConsumer(consumerModel);
+ repository.registerConsumer(consumerModel);
- serviceMetadata.getAttachments().putAll(referenceParameters);
+ serviceMetadata.getAttachments().putAll(referenceParameters);
- ref = createProxy(referenceParameters);
+ ref = createProxy(referenceParameters);
- serviceMetadata.setTarget(ref);
- serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
+ serviceMetadata.setTarget(ref);
+ serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
- consumerModel.setProxyObject(ref);
- consumerModel.initMethodModels();
+ consumerModel.setProxyObject(ref);
+ consumerModel.initMethodModels();
- checkInvokerAvailable();
+ checkInvokerAvailable();
+ } catch (Throwable t) {
+ try {
+ if (invoker != null) {
+ invoker.destroy();
+ }
+ } catch (Throwable destroy) {
+ logger.warn("Unexpected error occurred when destroy invoker of ReferenceConfig(" + url + ").", destroy);
+ }
+ if (consumerModel != null) {
+ ModuleServiceRepository repository = getScopeModel().getServiceRepository();
+ repository.unregisterConsumer(consumerModel);
+ }
+ initialized = false;
+ invoker = null;
+ ref = null;
+ consumerModel = null;
+ serviceMetadata.setTarget(null);
+ serviceMetadata.getAttributeMap().remove(PROXY_CLASS_REF);
+
+ throw t;
+ }
+ initialized = true;
}
private void initServiceAppsMapping(Map<String, String> referenceParameters) {
@@ -532,17 +553,16 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
private void checkInvokerAvailable() throws IllegalStateException {
if (shouldCheck() && !invoker.isAvailable()) {
- invoker.destroy();
- throw new IllegalStateException("Failed to check the status of the service "
- + interfaceName
- + ". No provider available for the service "
- + (group == null ? "" : group + "/")
- + interfaceName +
- (version == null ? "" : ":" + version)
- + " from the url "
- + invoker.getUrl()
- + " to the consumer "
- + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
+ throw new IllegalStateException("Failed to check the status of the service "
+ + interfaceName
+ + ". No provider available for the service "
+ + (group == null ? "" : group + "/")
+ + interfaceName +
+ (version == null ? "" : ":" + version)
+ + " from the url "
+ + invoker.getUrl()
+ + " to the consumer "
+ + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index cbe691e1fc..35995f3c45 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -122,6 +122,15 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> {
}
}
+ @Override
+ public void destroy() {
+ super.destroy();
+ if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
+ getConsumerConfigurationListener(moduleModel).removeNotifyListener(this);
+ referenceConfigurationListener.stop();
+ }
+ }
+
@Override
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(getInterface(), url.addParameter(REGISTRY_TYPE_KEY, SERVICE_REGISTRY_TYPE)));
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 57c36cfc0f..d169880173 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -39,7 +39,6 @@ import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.router.state.BitList;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
-import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.ArrayList;
@@ -75,7 +74,7 @@ import static org.apache.dubbo.common.constants.RegistryConstants.ROUTE_PROTOCOL
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.ROUTER_KEY;
-import static org.apache.dubbo.rpc.model.ScopeModelUtil.getApplicationModel;
+import static org.apache.dubbo.rpc.model.ScopeModelUtil.getModuleModel;
/**
@@ -97,19 +96,21 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
* The initial value is null and the midway may be assigned to null, please use the local variable reference
*/
protected volatile Set<URL> cachedInvokerUrls;
- private final ApplicationModel applicationModel;
+ private final ModuleModel moduleModel;
public RegistryDirectory(Class<T> serviceType, URL url) {
super(serviceType, url);
- applicationModel = getApplicationModel(url.getScopeModel());
- consumerConfigurationListener = getConsumerConfigurationListener(url.getOrDefaultModuleModel());
+ moduleModel = getModuleModel(url.getScopeModel());
+ consumerConfigurationListener = getConsumerConfigurationListener(moduleModel);
}
@Override
public void subscribe(URL url) {
super.subscribe(url);
- consumerConfigurationListener.addNotifyListener(this);
- referenceConfigurationListener = new ReferenceConfigurationListener(url.getOrDefaultModuleModel(), this, url);
+ if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
+ consumerConfigurationListener.addNotifyListener(this);
+ referenceConfigurationListener = new ReferenceConfigurationListener(moduleModel, this, url);
+ }
}
private ConsumerConfigurationListener getConsumerConfigurationListener(ModuleModel moduleModel) {
@@ -120,8 +121,23 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
@Override
public void unSubscribe(URL url) {
super.unSubscribe(url);
- consumerConfigurationListener.removeNotifyListener(this);
- referenceConfigurationListener.stop();
+ if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
+ consumerConfigurationListener.removeNotifyListener(this);
+ if (referenceConfigurationListener != null) {
+ referenceConfigurationListener.stop();
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ if (moduleModel.getModelEnvironment().getConfiguration().convert(Boolean.class, org.apache.dubbo.registry.Constants.ENABLE_CONFIGURATION_LISTEN, true)) {
+ consumerConfigurationListener.removeNotifyListener(this);
+ if (referenceConfigurationListener != null) {
+ referenceConfigurationListener.stop();
+ }
+ }
}
@Override
@@ -131,10 +147,10 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
}
Map<String, List<URL>> categoryUrls = urls.stream()
- .filter(Objects::nonNull)
- .filter(this::isValidCategory)
- .filter(this::isNotCompatibleFor26x)
- .collect(Collectors.groupingBy(this::judgeCategory));
+ .filter(Objects::nonNull)
+ .filter(this::isValidCategory)
+ .filter(this::isNotCompatibleFor26x)
+ .collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
@@ -150,7 +166,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
- providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
+ providerURLs = addressListener.notify(providerURLs, getConsumerUrl(), this);
}
}
refreshOverrideAndInvoker(providerURLs);
@@ -194,14 +210,14 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1
- && invokerUrls.get(0) != null
- && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
+ && invokerUrls.get(0) != null
+ && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // Forbid to access
routerChain.setInvokers(BitList.emptyList());
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
-
+
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
@@ -240,7 +256,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
- .toString()));
+ .toString()));
return;
}
@@ -350,8 +366,8 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
}
if (!getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
- " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
- " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
+ " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
+ " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
getUrl().getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
@@ -393,7 +409,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
if (providerUrl instanceof ServiceAddressURL) {
providerUrl = overrideWithConfigurator(providerUrl);
} else {
- providerUrl = applicationModel.getBeanFactory().getBean(ClusterUtils.class).mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
+ providerUrl = moduleModel.getApplicationModel().getBeanFactory().getBean(ClusterUtils.class).mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
providerUrl = overrideWithConfigurator(providerUrl);
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
}
@@ -404,7 +420,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
}
if ((providerUrl.getPath() == null || providerUrl.getPath()
- .length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0
+ .length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0
//fix by tony.chenl DUBBO-44
String path = directoryUrl.getServiceInterface();
if (path != null) {
@@ -446,18 +462,18 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
String appName = interfaceAddressURL.getApplication();
String side = interfaceAddressURL.getSide();
overriddenURL = URLBuilder.from(interfaceAddressURL)
- .clearParameters()
- .addParameter(APPLICATION_KEY, appName)
- .addParameter(SIDE_KEY, side).build();
+ .clearParameters()
+ .addParameter(APPLICATION_KEY, appName)
+ .addParameter(SIDE_KEY, side).build();
}
for (Configurator configurator : configurators) {
overriddenURL = configurator.configure(overriddenURL);
}
url = new DubboServiceAddressURL(
- interfaceAddressURL.getUrlAddress(),
- interfaceAddressURL.getUrlParam(),
- interfaceAddressURL.getConsumerURL(),
- (ServiceConfigURL) overriddenURL);
+ interfaceAddressURL.getUrlAddress(),
+ interfaceAddressURL.getUrlParam(),
+ interfaceAddressURL.getConsumerURL(),
+ (ServiceConfigURL) overriddenURL);
} else {
for (Configurator configurator : configurators) {
url = configurator.configure(url);
@@ -526,13 +542,13 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> {
private boolean isValidCategory(URL url) {
String category = url.getCategory(DEFAULT_CATEGORY);
if ((ROUTERS_CATEGORY.equals(category) || ROUTE_PROTOCOL.equals(url.getProtocol())) ||
- PROVIDERS_CATEGORY.equals(category) ||
- CONFIGURATORS_CATEGORY.equals(category) || DYNAMIC_CONFIGURATORS_CATEGORY.equals(category) ||
- APP_DYNAMIC_CONFIGURATORS_CATEGORY.equals(category)) {
+ PROVIDERS_CATEGORY.equals(category) ||
+ CONFIGURATORS_CATEGORY.equals(category) || DYNAMIC_CONFIGURATORS_CATEGORY.equals(category) ||
+ APP_DYNAMIC_CONFIGURATORS_CATEGORY.equals(category)) {
return true;
}
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " +
- getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
+ getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
return false;
}