You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/10/27 07:50:31 UTC
[dubbo] branch 3.0 updated: [3.0] Improve startup processing of api
usage, auto start module (#9083)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 1bff54b [3.0] Improve startup processing of api usage, auto start module (#9083)
1bff54b is described below
commit 1bff54bad3ef2b5f00f5bec1fa1427f8c2f6604c
Author: Gong Dewei <ky...@qq.com>
AuthorDate: Wed Oct 27 15:50:16 2021 +0800
[3.0] Improve startup processing of api usage, auto start module (#9083)
* Improve startup processing of api usage, auto start module
* Improve ReferenceConfig ref checking
* Fix MetadataServiceExporter
* Fix NPE of attribute map
* Replace checkStarted/checkStarting with checkState of ApplicationDeployer
* Fix testMultiModuleDeployAndReload
* improve deploy state notify and checking
* Fix check state lock
* Fix start future NPE of ApplicationDeployer
* Add async export/refer tests
* Avoid internal module wait itself
* Improve deploy locks
* Ensure complete start future when handle state event was throw an exception
---
.../dubbo/common/deploy/AbstractDeployer.java | 34 +-
.../dubbo/common/deploy/ApplicationDeployer.java | 12 +-
.../org/apache/dubbo/common/deploy/Deployer.java | 1 +
.../apache/dubbo/common/deploy/ModuleDeployer.java | 5 +-
.../dubbo/config/AbstractInterfaceConfig.java | 4 +-
.../org/apache/dubbo/config/ServiceConfigBase.java | 5 -
.../apache/dubbo/rpc/model/ApplicationModel.java | 29 +-
.../org/apache/dubbo/rpc/model/ModuleModel.java | 17 +-
.../org/apache/dubbo/config/ReferenceConfig.java | 22 +-
.../org/apache/dubbo/config/ServiceConfig.java | 54 ++--
.../config/deploy/DefaultApplicationDeployer.java | 355 +++++++++++----------
.../dubbo/config/deploy/DefaultModuleDeployer.java | 117 ++++---
.../ConfigurableMetadataServiceExporter.java | 11 +-
...ltiInstanceTest.java => MultiInstanceTest.java} | 189 ++++++++++-
.../metadata/MetadataServiceExporterTest.java | 2 -
15 files changed, 561 insertions(+), 296 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/AbstractDeployer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/AbstractDeployer.java
index f7a02ae..fc46006 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/AbstractDeployer.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/AbstractDeployer.java
@@ -16,6 +16,8 @@
*/
package org.apache.dubbo.common.deploy;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.model.ScopeModel;
import java.util.ArrayList;
@@ -31,6 +33,8 @@ import static org.apache.dubbo.common.deploy.DeployState.STOPPING;
public abstract class AbstractDeployer<E extends ScopeModel> implements Deployer<E> {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractDeployer.class);
+
private volatile DeployState state = PENDING;
protected AtomicBoolean initialized = new AtomicBoolean(false);
@@ -100,34 +104,54 @@ public abstract class AbstractDeployer<E extends ScopeModel> implements Deployer
protected void setStarting() {
this.state = STARTING;
for (DeployListener<E> listener : listeners) {
- listener.onStarting(scopeModel);
+ try {
+ listener.onStarting(scopeModel);
+ } catch (Throwable e) {
+ logger.error(getIdentifier() + " an exception occurred when handle starting event", e);
+ }
}
}
protected void setStarted() {
this.state = STARTED;
for (DeployListener<E> listener : listeners) {
- listener.onStarted(scopeModel);
+ try {
+ listener.onStarted(scopeModel);
+ } catch (Throwable e) {
+ logger.error(getIdentifier() + " an exception occurred when handle started event", e);
+ }
}
}
protected void setStopping() {
this.state = STOPPING;
for (DeployListener<E> listener : listeners) {
- listener.onStopping(scopeModel);
+ try {
+ listener.onStopping(scopeModel);
+ } catch (Throwable e) {
+ logger.error(getIdentifier() + " an exception occurred when handle stopping event", e);
+ }
}
}
protected void setStopped() {
this.state = STOPPED;
for (DeployListener<E> listener : listeners) {
- listener.onStopped(scopeModel);
+ try {
+ listener.onStopped(scopeModel);
+ } catch (Throwable e) {
+ logger.error(getIdentifier() + " an exception occurred when handle stopped event", e);
+ }
}
}
protected void setFailed(Throwable cause) {
this.state = FAILED;
for (DeployListener<E> listener : listeners) {
- listener.onFailure(scopeModel, cause);
+ try {
+ listener.onFailure(scopeModel, cause);
+ } catch (Throwable e) {
+ logger.error(getIdentifier() + " an exception occurred when handle failed event", e);
+ }
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java
index 5673224..94f3483 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.common.deploy;
import org.apache.dubbo.common.config.ReferenceCache;
import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.concurrent.Future;
@@ -42,6 +43,8 @@ public interface ApplicationDeployer extends Deployer<ApplicationModel> {
*/
void stop() throws IllegalStateException;
+ Future getStartFuture();
+
/**
* Register application instance and start internal services
*/
@@ -71,8 +74,11 @@ public interface ApplicationDeployer extends Deployer<ApplicationModel> {
*/
boolean isBackground();
- void checkStarting();
-
- void checkStarted();
+ /**
+ * check all module state and update application state
+ */
+ void checkState();
+ // module state changed callbacks
+ void notifyModuleChanged(ModuleModel moduleModel, DeployState state);
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/Deployer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/Deployer.java
index 568dda2..531adcc 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/Deployer.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/Deployer.java
@@ -89,4 +89,5 @@ public interface Deployer<E extends ScopeModel> {
void removeDeployListener(DeployListener<E> listener);
+ String getIdentifier();
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ModuleDeployer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ModuleDeployer.java
index 6c32be8..80d36d2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ModuleDeployer.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ModuleDeployer.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.common.deploy;
import org.apache.dubbo.common.config.ReferenceCache;
-import org.apache.dubbo.config.ServiceConfigBase;
import org.apache.dubbo.rpc.model.ModuleModel;
import java.util.concurrent.Future;
@@ -31,6 +30,8 @@ public interface ModuleDeployer extends Deployer<ModuleModel> {
Future start() throws IllegalStateException;
+ Future getStartFuture();
+
void stop() throws IllegalStateException;
void preDestroy() throws IllegalStateException;
@@ -45,8 +46,6 @@ public interface ModuleDeployer extends Deployer<ModuleModel> {
void setPending();
- void notifyExportService(ServiceConfigBase<?> sc);
-
/**
* Whether start in background, do not await finish
*/
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java
index 7a75205..6f32849 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractInterfaceConfig.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.common.bytecode.Wrapper;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.config.Environment;
import org.apache.dubbo.common.config.InmemoryConfiguration;
+import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConfigUtils;
@@ -268,7 +269,8 @@ public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {
protected void appendMetricsCompatible(Map<String, String> map) {
MetricsConfig metricsConfig = getConfigManager().getMetrics().orElse(null);
if (metricsConfig != null) {
- if (!metricsConfig.getProtocol().equals(PROTOCOL_PROMETHEUS)) {
+ if (metricsConfig.getProtocol() != null && !StringUtils.isEquals(metricsConfig.getProtocol(), PROTOCOL_PROMETHEUS)) {
+ Assert.notEmptyString(metricsConfig.getPort(), "Metrics port cannot be null");
map.put("metrics.protocol", metricsConfig.getProtocol());
map.put("metrics.port", metricsConfig.getPort());
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
index 0d4ce7f..1ba6082 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
@@ -457,11 +457,6 @@ public abstract class ServiceConfigBase<T> extends AbstractServiceConfig {
*/
public abstract void export();
- /**
- * export service only, do not register application instance, for exporting services in batches by module
- */
- public abstract void exportOnly();
-
public abstract void unexport();
public abstract boolean isExported();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java
index e5f1e82..af3ce68 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ApplicationModel.java
@@ -200,8 +200,10 @@ public class ApplicationModel extends ScopeModel {
frameworkModel.addApplication(this);
initialize();
// bind to default instance if absent
- if (defaultInstance == null) {
- defaultInstance = this;
+ synchronized (ApplicationModel.class) {
+ if (defaultInstance == null) {
+ defaultInstance = this;
+ }
}
}
@@ -235,7 +237,17 @@ public class ApplicationModel extends ScopeModel {
@Override
protected void onDestroy() {
+ // 1. remove from frameworkModel
+ if (defaultInstance == this) {
+ synchronized (ApplicationModel.class) {
+ frameworkModel.removeApplication(this);
+ defaultInstance = null;
+ }
+ } else {
+ frameworkModel.removeApplication(this);
+ }
+ // 2. pre-destroy, set stopping
if (deployer != null) {
deployer.preDestroy();
}
@@ -249,15 +261,7 @@ public class ApplicationModel extends ScopeModel {
// destroy internal module later
internalModule.destroy();
- if (defaultInstance == this) {
- synchronized (ApplicationModel.class) {
- frameworkModel.removeApplication(this);
- defaultInstance = null;
- }
- } else {
- frameworkModel.removeApplication(this);
- }
-
+ // post-destroy, release registry resources
if (deployer != null) {
deployer.postDestroy();
}
@@ -277,7 +281,8 @@ public class ApplicationModel extends ScopeModel {
serviceRepository.destroy();
serviceRepository = null;
}
- // try destroy framework if no any application
+
+ // destroy framework if none application
frameworkModel.tryDestroy();
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ModuleModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ModuleModel.java
index d0d5b65..0b335cf 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ModuleModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ModuleModel.java
@@ -18,6 +18,8 @@ package org.apache.dubbo.rpc.model;
import org.apache.dubbo.common.config.ModuleEnvironment;
import org.apache.dubbo.common.context.ModuleExt;
+import org.apache.dubbo.common.deploy.ApplicationDeployer;
+import org.apache.dubbo.common.deploy.DeployState;
import org.apache.dubbo.common.deploy.ModuleDeployer;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.ExtensionScope;
@@ -55,6 +57,12 @@ public class ModuleModel extends ScopeModel {
Assert.notNull(serviceRepository, "ModuleServiceRepository can not be null");
Assert.notNull(moduleConfigManager, "ModuleConfigManager can not be null");
Assert.assertTrue(moduleConfigManager.isInitialized(), "ModuleConfigManager can not be initialized");
+
+ // notify application check state
+ ApplicationDeployer applicationDeployer = applicationModel.getDeployer();
+ if (applicationDeployer != null) {
+ applicationDeployer.notifyModuleChanged(this, DeployState.PENDING);
+ }
}
@Override
@@ -82,12 +90,15 @@ public class ModuleModel extends ScopeModel {
@Override
protected void onDestroy() {
+ // 1. remove from applicationModel
+ applicationModel.removeModule(this);
+
+ // 2. set stopping
if (deployer != null) {
deployer.preDestroy();
}
- applicationModel.removeModule(this);
-
+ // 3. release services
if (deployer != null) {
deployer.postDestroy();
}
@@ -104,6 +115,8 @@ public class ModuleModel extends ScopeModel {
moduleEnvironment.destroy();
moduleEnvironment = null;
}
+
+ // destroy application if none pub module
applicationModel.tryDestroy();
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 47d2c82..51079e4 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -39,7 +39,6 @@ import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
import org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster;
-import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.AsyncMethodInfo;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.ModuleServiceRepository;
@@ -194,13 +193,20 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
}
@Override
- public synchronized T get() {
+ public T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
- init();
+ // ensure start module, compatible with old api usage
+ getScopeModel().getDeployer().start();
+
+ synchronized (this) {
+ if (ref == null) {
+ init();
+ }
+ }
}
return ref;
@@ -233,13 +239,7 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
if (initialized) {
return;
}
-
- if (getScopeModel() == null) {
- setScopeModel(ApplicationModel.defaultModel().getDefaultModule());
- }
-
- // prepare application for reference
- getScopeModel().getDeployer().prepare();
+ initialized = true;
if (!this.isRefreshed()) {
this.refresh();
@@ -271,8 +271,6 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
consumerModel.setProxyObject(ref);
consumerModel.initMethodModels();
- initialized = true;
-
checkInvokerAvailable();
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 663836a..60b7fc8 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.config;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.Version;
-import org.apache.dubbo.common.deploy.ModuleDeployer;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -204,49 +203,30 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
}
@Override
- public synchronized void export() {
+ public void export() {
if (this.exported) {
return;
}
- // prepare for export
- ModuleDeployer moduleDeployer = getScopeModel().getDeployer();
- moduleDeployer.prepare();
- if (!this.isRefreshed()) {
- this.refresh();
- }
- if (this.shouldExport()) {
- this.init();
+ // ensure start module, compatible with old api usage
+ getScopeModel().getDeployer().start();
- if (shouldDelay()) {
- doDelayExport();
- } else {
- doExport();
+ synchronized (this) {
+ if (this.exported) {
+ return;
}
- // notify export this service
- moduleDeployer.notifyExportService(this);
- }
- }
-
- /**
- * export service only, do not register application instance, for exporting services in batches by module
- */
- @Override
- public synchronized void exportOnly() {
- if (this.exported) {
- return;
- }
- if (!this.isRefreshed()) {
- this.refresh();
- }
- if (this.shouldExport()) {
- this.init();
+ if (!this.isRefreshed()) {
+ this.refresh();
+ }
+ if (this.shouldExport()) {
+ this.init();
- if (shouldDelay()) {
- doDelayExport();
- } else {
- doExport();
+ if (shouldDelay()) {
+ doDelayExport();
+ } else {
+ doExport();
+ }
}
}
}
@@ -403,6 +383,8 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
Map<String, String> map = buildAttributes(protocolConfig);
+ // remove null key and null value
+ map.keySet().removeIf(key -> key == null || map.get(key) == null);
//init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
index 81c718f..41caec1 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
@@ -107,6 +107,7 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
private volatile ServiceInstance serviceInstance;
private AtomicBoolean hasPreparedApplicationInstance = new AtomicBoolean(false);
+ private AtomicBoolean hasPreparedInternalModule = new AtomicBoolean(false);
private volatile MetadataService metadataService;
@@ -116,7 +117,10 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
private String identifier;
private volatile CompletableFuture startFuture;
private DubboShutdownHook dubboShutdownHook;
- private Object startedLock = new Object();
+ private Object stateLock = new Object();
+ private Object startLock = new Object();
+ private Object destroyLock = new Object();
+ private Object internalModuleLock = new Object();
public DefaultApplicationDeployer(ApplicationModel applicationModel) {
super(applicationModel);
@@ -185,7 +189,7 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
return;
}
// Ensure that the initialization is completed when concurrent calls
- synchronized (this) {
+ synchronized (startLock) {
if (initialized.get()) {
return;
}
@@ -218,7 +222,9 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
private void initModuleDeployers() {
// make sure created default module
applicationModel.getDefaultModule();
- for (ModuleModel moduleModel : applicationModel.getModuleModels()) {
+ // copy modules and initialize avoid ConcurrentModificationException if add new module
+ List<ModuleModel> moduleModels = new ArrayList<>(applicationModel.getModuleModels());
+ for (ModuleModel moduleModel : moduleModels) {
moduleModel.getDeployer().initialize();
}
}
@@ -512,35 +518,40 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
* @return
*/
@Override
- public synchronized Future start() {
- CompletableFuture startFuture = getStartFuture();
+ public Future start() {
+ synchronized (startLock) {
+ if (isStopping() || isStopped() || isFailed()) {
+ throw new IllegalStateException(getIdentifier() + " is stopping or stopped, can not start again");
+ }
- // maybe call start again after add new module, check if any new module
- boolean hasPendingModule = hasPendingModule();
+ // maybe call start again after add new module, check if any new module
+ boolean hasPendingModule = hasPendingModule();
- if (isStarting()) {
- // currently is starting, maybe both start by module and application
- // if has new modules, start them
- if (hasPendingModule) {
- startModules();
+ if (isStarting()) {
+ // currently is starting, maybe both start by module and application
+ // if has new modules, start them
+ if (hasPendingModule) {
+ startModules();
+ }
+ // if is starting, reuse previous startFuture
+ return startFuture;
}
- // if is starting, reuse previous startFuture
- return startFuture;
- }
- // if is started and no new module, just return
- if (isStarted() && !hasPendingModule) {
- completeStartFuture(false);
- return startFuture;
- }
+ // if is started and no new module, just return
+ if (isStarted() && !hasPendingModule) {
+ return CompletableFuture.completedFuture(false);
+ }
- onStarting();
+ // pending -> starting : first start app
+ // started -> starting : re-start app
+ onStarting();
- initialize();
+ initialize();
- doStart();
+ doStart();
- return startFuture;
+ return startFuture;
+ }
}
private boolean hasPendingModule() {
@@ -554,18 +565,11 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
return found;
}
- private CompletableFuture getStartFuture() {
- if (startFuture == null) {
- synchronized (this) {
- if (startFuture == null) {
- startFuture = new CompletableFuture();
- }
- }
- }
+ @Override
+ public Future getStartFuture() {
return startFuture;
}
-
private void doStart() {
startModules();
@@ -573,70 +577,72 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
prepareApplicationInstance();
executorRepository.getSharedExecutor().submit(() -> {
- while (true) {
- // notify on each module started
- synchronized (startedLock) {
- try {
- startedLock.wait(500);
- } catch (InterruptedException e) {
- // ignore
+ try {
+ while (isStarting()) {
+ // notify when any module state changed
+ synchronized (stateLock) {
+ try {
+ stateLock.wait(500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
- }
-
- // if has new module, do start again
- if (hasPendingModule()) {
- startModules();
- continue;
- }
- DeployState newState = checkState();
- if (!(newState == DeployState.STARTING || newState == DeployState.PENDING)) {
- // start finished or error
- break;
+ // if has new module, do start again
+ if (hasPendingModule()) {
+ startModules();
+ }
}
+ } catch (Exception e) {
+ logger.warn("waiting for application startup occurred an exception", e);
}
});
}
private void startModules() {
- // copy current modules, ignore new module during starting
- List<ModuleModel> moduleModels = new ArrayList<>(applicationModel.getModuleModels());
- for (ModuleModel moduleModel : moduleModels) {
- // export services in module
- if (moduleModel.getDeployer().isPending()) {
- moduleModel.getDeployer().start();
- }
- }
+ // ensure init and start internal module first
+ prepareInternalModule();
+
+ // filter and start pending modules, ignore new module during starting
+ applicationModel.getModuleModels().stream()
+ .filter(moduleModel -> moduleModel.getDeployer().isPending())
+ .forEach(moduleModel -> moduleModel.getDeployer().start());
}
@Override
public void prepareApplicationInstance() {
+ // ensure init and start internal module first
+ prepareInternalModule();
+
if (hasPreparedApplicationInstance.get()) {
return;
}
// if register consumer instance or has exported services
if (isRegisterConsumerInstance() || hasExportedServices()) {
- if (!hasPreparedApplicationInstance.compareAndSet(false, true)) {
- return;
+ if (hasPreparedApplicationInstance.compareAndSet(false, true)) {
+ // register the local ServiceInstance if required
+ registerServiceInstance();
}
- prepareInternalModule();
- // register the local ServiceInstance if required
- registerServiceInstance();
}
}
private void prepareInternalModule() {
- // export MetadataService
- exportMetadataService();
- // start internal module
- ModuleDeployer internalModuleDeployer = applicationModel.getInternalModule().getDeployer();
- if (!internalModuleDeployer.isStarted()) {
- Future future = internalModuleDeployer.start();
- // wait for internal module start finished
- try {
- future.get();
- } catch (Exception e) {
- logger.warn("wait for internal module started failed: " + e.getMessage(), e);
+ synchronized (internalModuleLock) {
+ if (!hasPreparedInternalModule.compareAndSet(false, true)) {
+ return;
+ }
+ // export MetadataService
+ exportMetadataService();
+ // start internal module
+ ModuleDeployer internalModuleDeployer = applicationModel.getInternalModule().getDeployer();
+ if (!internalModuleDeployer.isStarted()) {
+ Future future = internalModuleDeployer.start();
+ // wait for internal module startup
+ try {
+ future.get(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ logger.warn("wait for internal module startup failed: " + e.getMessage(), e);
+ }
}
}
}
@@ -834,45 +840,48 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
@Override
public void preDestroy() {
- if (isStopping() || isStopped()) {
- return;
- }
- onStopping();
+ synchronized(destroyLock) {
+ if (isStopping() || isStopped()) {
+ return;
+ }
+ onStopping();
- unRegisterShutdownHook();
- if (asyncMetadataFuture != null) {
- asyncMetadataFuture.cancel(true);
+ unRegisterShutdownHook();
+ if (asyncMetadataFuture != null) {
+ asyncMetadataFuture.cancel(true);
+ }
+ unregisterServiceInstance();
+ unexportMetadataService();
}
- unregisterServiceInstance();
- unexportMetadataService();
-
}
@Override
- public synchronized void postDestroy() {
- // expect application model is destroyed before here
- if (isStopped()) {
- return;
- }
- try {
- executeShutdownCallbacks();
+ public void postDestroy() {
+ synchronized(destroyLock) {
+ // expect application model is destroyed before here
+ if (isStopped()) {
+ return;
+ }
+ try {
+ executeShutdownCallbacks();
- destroyRegistries();
- destroyServiceDiscoveries();
- destroyMetadataReports();
+ destroyRegistries();
+ destroyServiceDiscoveries();
+ destroyMetadataReports();
- // TODO should we close unused protocol server which only used by this application?
- // protocol server will be closed on all applications of same framework are stopped currently, but no associate to application
- // see org.apache.dubbo.config.deploy.FrameworkModelCleaner#destroyProtocols
- // see org.apache.dubbo.config.bootstrap.DubboBootstrapMultiInstanceTest#testMultiProviderApplicationStopOneByOne
+ // TODO should we close unused protocol server which only used by this application?
+ // protocol server will be closed on all applications of same framework are stopped currently, but no associate to application
+ // see org.apache.dubbo.config.deploy.FrameworkModelCleaner#destroyProtocols
+ // see org.apache.dubbo.config.bootstrap.DubboBootstrapMultiInstanceTest#testMultiProviderApplicationStopOneByOne
- // destroy all executor services
- destroyExecutorRepository();
+ // destroy all executor services
+ destroyExecutorRepository();
- onStopped();
- } catch (Throwable ex) {
- logger.error(getIdentifier() + " an error occurred while stopping application: " + ex.getMessage(), ex);
- setFailed(ex);
+ onStopped();
+ } catch (Throwable ex) {
+ logger.error(getIdentifier() + " an error occurred while stopping application: " + ex.getMessage(), ex);
+ setFailed(ex);
+ }
}
}
@@ -882,41 +891,41 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
}
@Override
- public void checkStarting() {
- if (isStarting()) {
- return;
- }
- for (ModuleModel moduleModel : applicationModel.getModuleModels()) {
- if (moduleModel.getDeployer().isStarting()) {
- onStarting();
- break;
- }
+ public void notifyModuleChanged(ModuleModel moduleModel, DeployState state) {
+ checkState();
+
+ // notify module state changed or module changed
+ synchronized (stateLock) {
+ stateLock.notifyAll();
}
}
@Override
- public void checkStarted() {
- // TODO improve newState checking
- DeployState newState = checkState();
- switch (newState) {
- case STARTED:
- onStarted();
- break;
- case STARTING:
- onStarting();
- break;
- case PENDING:
- setPending();
- break;
- }
-
- // notify started
- synchronized (startedLock) {
- startedLock.notifyAll();
+ public void checkState() {
+ synchronized (stateLock) {
+ DeployState newState = calculateState();
+ switch (newState) {
+ case STARTED:
+ onStarted();
+ break;
+ case STARTING:
+ onStarting();
+ break;
+ case STOPPING:
+ onStopping();
+ break;
+ case STOPPED:
+ onStopped();
+ break;
+ case PENDING:
+ // cannot change to pending from other state
+ // setPending();
+ break;
+ }
}
}
- private DeployState checkState() {
+ private DeployState calculateState() {
DeployState newState = DeployState.UNKNOWN;
int pending = 0, starting = 0, started = 0, stopping = 0, stopped = 0;
for (ModuleModel moduleModel : applicationModel.getModuleModels()) {
@@ -966,64 +975,77 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
}
private void onStarting() {
- if (isStarting()) {
+ // pending -> starting
+ // started -> starting
+ if (!(isPending() || isStarted())) {
return;
}
setStarting();
+ startFuture = new CompletableFuture();
if (logger.isInfoEnabled()) {
logger.info(getIdentifier() + " is starting.");
}
}
private void onStarted() {
- if (isStarted()) {
- return;
- }
- setStarted();
- if (logger.isInfoEnabled()) {
- logger.info(getIdentifier() + " is ready.");
- }
- // refresh metadata
try {
- if (serviceInstance != null) {
- ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance);
+ // starting -> started
+ if (!isStarting()) {
+ return;
}
- } catch (Exception e) {
- logger.error("refresh metadata failed: " + e.getMessage(), e);
+ setStarted();
+ if (logger.isInfoEnabled()) {
+ logger.info(getIdentifier() + " is ready.");
+ }
+ // refresh metadata
+ try {
+ if (serviceInstance != null) {
+ ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance);
+ }
+ } catch (Exception e) {
+ logger.error("refresh metadata failed: " + e.getMessage(), e);
+ }
+ // shutdown export/refer executor after started
+ executorRepository.shutdownServiceExportExecutor();
+ executorRepository.shutdownServiceReferExecutor();
+ } finally {
+ // complete future
+ completeStartFuture(true);
}
- // complete future
- completeStartFuture(true);
- // shutdown export/refer executor after started
- executorRepository.shutdownServiceExportExecutor();
- executorRepository.shutdownServiceReferExecutor();
}
private void completeStartFuture(boolean success) {
if (startFuture != null) {
startFuture.complete(success);
- startFuture = null;
}
}
private void onStopping() {
- if (isStopping()) {
- return;
- }
- setStopping();
- if (logger.isInfoEnabled()) {
- logger.info(getIdentifier() + " is stopping.");
+ try {
+ if (isStopping() || isStopped()) {
+ return;
+ }
+ setStopping();
+ if (logger.isInfoEnabled()) {
+ logger.info(getIdentifier() + " is stopping.");
+ }
+ } finally {
+ completeStartFuture(false);
}
}
private void onStopped() {
- if (isStopped()) {
- return;
- }
- setStopped();
- if (logger.isInfoEnabled()) {
- logger.info(getIdentifier() + " has stopped.");
+ try {
+ if (isStopped()) {
+ return;
+ }
+ setStopped();
+ if (logger.isInfoEnabled()) {
+ logger.info(getIdentifier() + " has stopped.");
+ }
+ } finally {
+ completeStartFuture(false);
}
-
}
private void destroyExecutorRepository() {
@@ -1059,7 +1081,8 @@ public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationMode
return configManager.getApplicationOrElseThrow();
}
- private String getIdentifier() {
+ @Override
+ public String getIdentifier() {
if (identifier == null) {
identifier = "Dubbo application[" + applicationModel.getInternalId() + "]";
if (applicationModel.getModelName() != null
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
index 530c1ba..eef3d29 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.config.deploy;
import org.apache.dubbo.common.config.ReferenceCache;
import org.apache.dubbo.common.deploy.AbstractDeployer;
import org.apache.dubbo.common.deploy.ApplicationDeployer;
+import org.apache.dubbo.common.deploy.DeployState;
import org.apache.dubbo.common.deploy.ModuleDeployListener;
import org.apache.dubbo.common.deploy.ModuleDeployer;
import org.apache.dubbo.common.logger.Logger;
@@ -72,6 +73,8 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
private Boolean background;
private Boolean exportAsync;
private Boolean referAsync;
+ private CompletableFuture<?> exportFuture;
+ private CompletableFuture<?> referFuture;
public DefaultModuleDeployer(ModuleModel moduleModel) {
@@ -122,23 +125,26 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
@Override
public synchronized Future start() throws IllegalStateException {
+ if (isStopping() || isStopped() || isFailed()) {
+ throw new IllegalStateException(getIdentifier() + " is stopping or stopped, can not start again");
+ }
+
if (isStarting() || isStarted()) {
return startFuture;
}
onModuleStarting();
- startFuture = new CompletableFuture();
-
- applicationDeployer.initialize();
// initialize
+ applicationDeployer.initialize();
initialize();
// export services
exportServices();
// prepare application instance
- if (hasExportedServices()) {
+ // exclude internal module to avoid wait itself
+ if (moduleModel != moduleModel.getApplicationModel().getInternalModule()) {
applicationDeployer.prepareApplicationInstance();
}
@@ -146,19 +152,26 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
referServices();
executorRepository.getSharedExecutor().submit(() -> {
-
- // wait for export finish
- waitExportFinish();
-
- // wait for refer finish
- waitReferFinish();
-
- onModuleStarted(startFuture);
+ try {
+ // wait for export finish
+ waitExportFinish();
+ // wait for refer finish
+ waitReferFinish();
+ } catch (Throwable e) {
+ logger.warn("wait for export/refer services occurred an exception", e);
+ } finally {
+ onModuleStarted();
+ }
});
return startFuture;
}
+ @Override
+ public Future getStartFuture() {
+ return startFuture;
+ }
+
private boolean hasExportedServices() {
return configManager.getServices().size() > 0;
}
@@ -219,26 +232,54 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
private void onModuleStarting() {
setStarting();
+ startFuture = new CompletableFuture();
logger.info(getIdentifier() + " is starting.");
- applicationDeployer.checkStarting();
+ applicationDeployer.notifyModuleChanged(moduleModel, DeployState.STARTING);
+ }
+
+ private void onModuleStarted() {
+ try {
+ if (isStarting()) {
+ setStarted();
+ logger.info(getIdentifier() + " has started.");
+ applicationDeployer.notifyModuleChanged(moduleModel, DeployState.STARTED);
+ }
+ } finally {
+ // complete module start future after application state changed
+ completeStartFuture(true);
+ }
}
- private void onModuleStarted(CompletableFuture startFuture) {
- setStarted();
- logger.info(getIdentifier() + " has started.");
- applicationDeployer.checkStarted();
- // complete module start future after application state changed, fix #9012 ?
- startFuture.complete(true);
+ private void completeStartFuture(boolean value) {
+ if (startFuture != null && !startFuture.isDone()) {
+ startFuture.complete(value);
+ }
+ if (exportFuture != null && !exportFuture.isDone()) {
+ exportFuture.cancel(true);
+ }
+ if (referFuture != null && !referFuture.isDone()) {
+ referFuture.cancel(true);
+ }
}
private void onModuleStopping() {
- setStopping();
- logger.info(getIdentifier() + " is stopping.");
+ try {
+ setStopping();
+ logger.info(getIdentifier() + " is stopping.");
+ applicationDeployer.notifyModuleChanged(moduleModel, DeployState.STOPPING);
+ } finally {
+ completeStartFuture(false);
+ }
}
private void onModuleStopped() {
- setStopped();
- logger.info(getIdentifier() + " has stopped.");
+ try {
+ setStopped();
+ logger.info(getIdentifier() + " has stopped.");
+ applicationDeployer.notifyModuleChanged(moduleModel, DeployState.STOPPED);
+ } finally {
+ completeStartFuture(false);
+ }
}
private void loadConfigs() {
@@ -266,7 +307,7 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
- sc.exportOnly();
+ sc.export();
exportedServices.add(sc);
}
} catch (Throwable t) {
@@ -277,7 +318,7 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
asyncExportingFutures.add(future);
} else {
if (!sc.isExported()) {
- sc.exportOnly();
+ sc.export();
exportedServices.add(sc);
}
}
@@ -349,10 +390,10 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
private void waitExportFinish() {
try {
logger.info(getIdentifier() + " waiting services exporting ...");
- CompletableFuture<?> future = CompletableFuture.allOf(asyncExportingFutures.toArray(new CompletableFuture[0]));
- future.get();
- } catch (Exception e) {
- logger.warn(getIdentifier() + " export services occurred an exception.");
+ exportFuture = CompletableFuture.allOf(asyncExportingFutures.toArray(new CompletableFuture[0]));
+ exportFuture.get();
+ } catch (Throwable e) {
+ logger.warn(getIdentifier() + " export services occurred an exception: " + e.toString());
} finally {
logger.info(getIdentifier() + " export services finished.");
asyncExportingFutures.clear();
@@ -362,10 +403,10 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
private void waitReferFinish() {
try {
logger.info(getIdentifier() + " waiting services referring ...");
- CompletableFuture<?> future = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
- future.get();
- } catch (Exception e) {
- logger.warn(getIdentifier() + " refer services occurred an exception.");
+ referFuture = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
+ referFuture.get();
+ } catch (Throwable e) {
+ logger.warn(getIdentifier() + " refer services occurred an exception: " + e.toString());
} finally {
logger.info(getIdentifier() + " refer services finished.");
asyncReferringFutures.clear();
@@ -395,7 +436,7 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
.isPresent();
}
- private String getIdentifier() {
+ public String getIdentifier() {
if (identifier == null) {
identifier = "Dubbo module[" + moduleModel.getInternalId() + "]";
if (moduleModel.getModelName() != null
@@ -420,12 +461,4 @@ public class DefaultModuleDeployer extends AbstractDeployer<ModuleModel> impleme
this.initialize();
}
- /**
- * After export one service, trigger starting application
- */
- @Override
- public void notifyExportService(ServiceConfigBase<?> sc) {
- applicationDeployer.prepareApplicationInstance();
- }
-
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
index b08b31b..23606b6 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/metadata/ConfigurableMetadataServiceExporter.java
@@ -34,6 +34,7 @@ import org.apache.dubbo.rpc.model.ScopeModelAware;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.emptyList;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
@@ -61,6 +62,7 @@ public class ConfigurableMetadataServiceExporter implements MetadataServiceExpor
private volatile ServiceConfig<MetadataService> serviceConfig;
private ApplicationModel applicationModel;
+ private AtomicBoolean exported = new AtomicBoolean(false);
public ConfigurableMetadataServiceExporter() {
}
@@ -77,7 +79,7 @@ public class ConfigurableMetadataServiceExporter implements MetadataServiceExpor
@Override
public ConfigurableMetadataServiceExporter export() {
- if (!isExported()) {
+ if (exported.compareAndSet(false, true)) {
ApplicationConfig applicationConfig = getApplicationConfig();
ServiceConfig<MetadataService> serviceConfig = new ServiceConfig<>();
@@ -92,8 +94,8 @@ public class ConfigurableMetadataServiceExporter implements MetadataServiceExpor
serviceConfig.setVersion(metadataService.version());
serviceConfig.setMethods(generateMethodConfig());
- // export
- serviceConfig.exportOnly();
+ // add to internal module, do export later
+ applicationModel.getInternalModule().getConfigManager().addService(serviceConfig);
if (logger.isInfoEnabled()) {
logger.info("The MetadataService exports urls : " + serviceConfig.getExportedUrls());
@@ -137,6 +139,7 @@ public class ConfigurableMetadataServiceExporter implements MetadataServiceExpor
if (isExported()) {
serviceConfig.unexport();
}
+ exported.set(false);
return this;
}
@@ -146,7 +149,7 @@ public class ConfigurableMetadataServiceExporter implements MetadataServiceExpor
}
public boolean isExported() {
- return serviceConfig != null && serviceConfig.isExported() && !serviceConfig.isUnexported();
+ return exported.get();
}
private ApplicationConfig getApplicationConfig() {
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/DubboBootstrapMultiInstanceTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/MultiInstanceTest.java
similarity index 78%
rename from dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/DubboBootstrapMultiInstanceTest.java
rename to dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/MultiInstanceTest.java
index 30cf5cb..b3a98ec 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/DubboBootstrapMultiInstanceTest.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/bootstrap/MultiInstanceTest.java
@@ -17,10 +17,14 @@
package org.apache.dubbo.config.bootstrap;
import org.apache.dubbo.common.deploy.ApplicationDeployer;
+import org.apache.dubbo.common.deploy.DeployListener;
import org.apache.dubbo.common.deploy.DeployState;
import org.apache.dubbo.common.deploy.ModuleDeployer;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
@@ -36,6 +40,7 @@ import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
import org.apache.dubbo.rpc.model.ModuleModel;
+import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.test.check.DubboTestChecker;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -46,12 +51,17 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_BOSS_POOL_NAME;
-public class DubboBootstrapMultiInstanceTest {
+public class MultiInstanceTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(MultiInstanceTest.class);
private static ZookeeperSingleRegistryCenter registryCenter;
@@ -89,7 +99,7 @@ public class DubboBootstrapMultiInstanceTest {
if (testChecker == null) {
testChecker = new DubboTestChecker();
testChecker.init(null);
- testClassName = DubboBootstrapMultiInstanceTest.class.getName();
+ testClassName = MultiInstanceTest.class.getName();
}
return testChecker.checkUnclosedThreads(testClassName, 0);
}
@@ -511,7 +521,8 @@ public class DubboBootstrapMultiInstanceTest {
.service(serviceConfig2)
.endModule();
- serviceConfig2.getScopeModel().getDeployer().start();
+ // start provider module 2 and wait
+ serviceConfig2.getScopeModel().getDeployer().start().get();
Assertions.assertNull(frameworkServiceRepository.lookupExportedServiceWithoutGroup(serviceKey1));
Assertions.assertNotNull(frameworkServiceRepository.lookupExportedServiceWithoutGroup(serviceKey2));
Assertions.assertNull(frameworkServiceRepository.lookupExportedServiceWithoutGroup(serviceKey3));
@@ -710,6 +721,138 @@ public class DubboBootstrapMultiInstanceTest {
}
}
+ @Test
+ public void testOldApiDeploy() throws Exception {
+
+ try {
+ // provider app
+ ApplicationModel providerApplicationModel = ApplicationModel.defaultModel();
+ ServiceConfig<DemoService> serviceConfig = new ServiceConfig<>();
+ serviceConfig.setScopeModel(providerApplicationModel.getDefaultModule());
+ serviceConfig.setRef(new DemoServiceImpl());
+ serviceConfig.setInterface(DemoService.class);
+ serviceConfig.setApplication(new ApplicationConfig("provider-app"));
+ serviceConfig.setRegistry(new RegistryConfig(registryConfig.getAddress()));
+ // add service
+ //serviceConfig.getScopeModel().getConfigManager().addService(serviceConfig);
+
+ // detect deploy events
+ DeployEventHandler serviceDeployEventHandler = new DeployEventHandler(serviceConfig.getScopeModel());
+ serviceConfig.getScopeModel().getDeployer().addDeployListener(serviceDeployEventHandler);
+ // before starting
+ Map<DeployState, Long> serviceDeployEventMap = serviceDeployEventHandler.deployEventMap;
+ Assertions.assertFalse(serviceDeployEventMap.containsKey(DeployState.STARTING));
+ Assertions.assertFalse(serviceDeployEventMap.containsKey(DeployState.STARTED));
+
+ // export service and start module
+ serviceConfig.export();
+ // expect internal module is started
+ Assertions.assertTrue(providerApplicationModel.getInternalModule().getDeployer().isStarted());
+ // expect service module is starting
+ Assertions.assertTrue(serviceDeployEventMap.containsKey(DeployState.STARTING));
+ // wait for service module started
+ serviceConfig.getScopeModel().getDeployer().getStartFuture().get();
+ Assertions.assertTrue(serviceDeployEventMap.containsKey(DeployState.STARTED));
+
+
+ // consumer app
+ ApplicationModel consumerApplicationModel = new ApplicationModel(FrameworkModel.defaultModel());
+ ReferenceConfig<DemoService> referenceConfig = new ReferenceConfig<>();
+ referenceConfig.setScopeModel(consumerApplicationModel.getDefaultModule());
+ referenceConfig.setApplication(new ApplicationConfig("consumer-app"));
+ referenceConfig.setInterface(DemoService.class);
+ referenceConfig.setRegistry(new RegistryConfig(registryConfig.getAddress()));
+ referenceConfig.setScope("remote");
+
+ // detect deploy events
+ DeployEventHandler referDeployEventHandler = new DeployEventHandler(referenceConfig.getScopeModel());
+ referenceConfig.getScopeModel().getDeployer().addDeployListener(referDeployEventHandler);
+
+ // before starting
+ Map<DeployState, Long> deployEventMap = referDeployEventHandler.deployEventMap;
+ Assertions.assertFalse(deployEventMap.containsKey(DeployState.STARTING));
+ Assertions.assertFalse(deployEventMap.containsKey(DeployState.STARTED));
+
+ // get ref proxy and start module
+ DemoService demoService = referenceConfig.get();
+ // expect internal module is started
+ Assertions.assertTrue(consumerApplicationModel.getInternalModule().getDeployer().isStarted());
+ Assertions.assertTrue(deployEventMap.containsKey(DeployState.STARTING));
+ // wait for reference module started
+ referenceConfig.getScopeModel().getDeployer().getStartFuture().get();
+ Assertions.assertTrue(deployEventMap.containsKey(DeployState.STARTED));
+
+ // stop consumer app
+ consumerApplicationModel.destroy();
+ Assertions.assertTrue(deployEventMap.containsKey(DeployState.STOPPING));
+ Assertions.assertTrue(deployEventMap.containsKey(DeployState.STOPPED));
+
+ // stop provider app
+ providerApplicationModel.destroy();
+ Assertions.assertTrue(serviceDeployEventMap.containsKey(DeployState.STOPPING));
+ Assertions.assertTrue(serviceDeployEventMap.containsKey(DeployState.STOPPED));
+
+ } finally {
+ FrameworkModel.destroyAll();
+ }
+ }
+
+ @Test
+ public void testAsyncExportAndReferServices() throws ExecutionException, InterruptedException {
+ DubboBootstrap providerBootstrap = DubboBootstrap.newInstance();
+ DubboBootstrap consumerBootstrap = DubboBootstrap.newInstance();
+ try {
+
+ ServiceConfig serviceConfig = new ServiceConfig();
+ serviceConfig.setInterface(Greeting.class);
+ serviceConfig.setRef(new GreetingLocal2());
+ serviceConfig.setExportAsync(true);
+
+ ReferenceConfig<Greeting> referenceConfig = new ReferenceConfig<>();
+ referenceConfig.setInterface(Greeting.class);
+ referenceConfig.setInjvm(false);
+ referenceConfig.setReferAsync(true);
+ referenceConfig.setCheck(false);
+
+ // provider app
+ Future providerFuture = providerBootstrap
+ .application("provider-app")
+ .registry(registryConfig)
+ .protocol(new ProtocolConfig("dubbo", -1))
+ .service(serviceConfig)
+ .asyncStart();
+ logger.warn("provider app has start async");
+ Assertions.assertFalse(serviceConfig.getScopeModel().getDeployer().isStarted(), "Async export seems something wrong");
+
+ // consumer app
+ Future consumerFuture = consumerBootstrap
+ .application("consumer-app")
+ .registry(registryConfig)
+ .reference(referenceConfig)
+ .asyncStart();
+ logger.warn("consumer app has start async");
+ Assertions.assertFalse(referenceConfig.getScopeModel().getDeployer().isStarted(), "Async refer seems something wrong");
+
+ // wait for provider app startup
+ providerFuture.get();
+ logger.warn("provider app is startup");
+ Assertions.assertEquals(true, serviceConfig.isExported());
+ ServiceDescriptor serviceDescriptor = serviceConfig.getScopeModel().getServiceRepository().lookupService(Greeting.class.getName());
+ Assertions.assertNotNull(serviceDescriptor);
+
+ // wait for consumer app startup
+ consumerFuture.get();
+ logger.warn("consumer app is startup");
+ Object target = referenceConfig.getServiceMetadata().getTarget();
+ Assertions.assertNotNull(target);
+ Greeting greetingService = (Greeting) target;
+ String result = greetingService.hello();
+ Assertions.assertEquals("local", result);
+ } finally {
+ providerBootstrap.stop();
+ consumerBootstrap.stop();
+ }
+ }
private DubboBootstrap configConsumerApp(DubboBootstrap dubboBootstrap) {
ReferenceConfig<DemoService> referenceConfig = new ReferenceConfig<>();
@@ -749,4 +892,44 @@ public class DubboBootstrapMultiInstanceTest {
return dubboBootstrap;
}
+ private static class DeployEventHandler implements DeployListener<ModuleModel> {
+
+ Map<DeployState, Long> deployEventMap = new LinkedHashMap<>();
+
+ ModuleModel moduleModel;
+
+ public DeployEventHandler(ModuleModel moduleModel) {
+ this.moduleModel = moduleModel;
+ }
+
+ @Override
+ public void onStarting(ModuleModel scopeModel) {
+ Assertions.assertEquals(moduleModel, scopeModel);
+ deployEventMap.put(DeployState.STARTING, System.currentTimeMillis());
+ }
+
+ @Override
+ public void onStarted(ModuleModel scopeModel) {
+ Assertions.assertEquals(moduleModel, scopeModel);
+ deployEventMap.put(DeployState.STARTED, System.currentTimeMillis());
+ }
+
+ @Override
+ public void onStopping(ModuleModel scopeModel) {
+ Assertions.assertEquals(moduleModel, scopeModel);
+ deployEventMap.put(DeployState.STOPPING, System.currentTimeMillis());
+ }
+
+ @Override
+ public void onStopped(ModuleModel scopeModel) {
+ Assertions.assertEquals(moduleModel, scopeModel);
+ deployEventMap.put(DeployState.STOPPED, System.currentTimeMillis());
+ }
+
+ @Override
+ public void onFailure(ModuleModel scopeModel, Throwable cause) {
+ Assertions.assertEquals(moduleModel, scopeModel);
+ deployEventMap.put(DeployState.FAILED, System.currentTimeMillis());
+ }
+ }
}
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/metadata/MetadataServiceExporterTest.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/metadata/MetadataServiceExporterTest.java
index 2f95adc..85ad013 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/metadata/MetadataServiceExporterTest.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/metadata/MetadataServiceExporterTest.java
@@ -52,8 +52,6 @@ public class MetadataServiceExporterTest {
@Test
public void test() {
- ApplicationModel.defaultModel().getInternalModule().getDeployer().start();
-
MetadataService metadataService = Mockito.mock(MetadataService.class);
ConfigurableMetadataServiceExporter exporter = new ConfigurableMetadataServiceExporter();
exporter.setMetadataService(metadataService);