You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/13 05:38:11 UTC
[dubbo] branch 3.0 updated: [3.0] Add support for async
export/refer (#7931)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 3163790 [3.0] Add support for async export/refer (#7931)
3163790 is described below
commit 3163790d013d82d76339b465385023b0a7f0c3a2
Author: Wu Zhiguo <wz...@gmail.com>
AuthorDate: Sun Jun 13 13:37:49 2021 +0800
[3.0] Add support for async export/refer (#7931)
* add support for async export/refer
* move awaiting logical to another thread
* change log information
* use same executor
* fix ut
* use async as default mode
* Revert "use async as default mode"
This reverts commit ca373ffb
* add support for custom pool size for async export/refer executor
* add shutdown logic for async export/refer executor
* add shutdown logic for async export/refer executor
* getExportReferExecutor private
* move exportReferExecutor to ExecutorRepository
* delete file added unexpectedly
* fix ut
* rollback executorRepository declaration on Bootstrap
* move async to reference/service/consumer/provider
* move constants
* fix ut
* move async pool size config to provider/consumer config
* add shutdown for executor, add spring xsd config
* shutdown
---
.../dubbo/common/constants/CommonConstants.java | 8 +++
.../manager/DefaultExecutorRepository.java | 67 +++++++++++++++++--
.../threadpool/manager/ExecutorRepository.java | 8 ++-
.../dubbo/config/AbstractReferenceConfig.java | 15 +++++
.../apache/dubbo/config/AbstractServiceConfig.java | 15 +++++
.../org/apache/dubbo/config/ConsumerConfig.java | 15 +++++
.../org/apache/dubbo/config/ProviderConfig.java | 16 +++++
.../apache/dubbo/config/ReferenceConfigBase.java | 9 +++
.../org/apache/dubbo/config/ServiceConfigBase.java | 9 +++
.../dubbo/config/annotation/DubboReference.java | 5 ++
.../dubbo/config/annotation/DubboService.java | 4 ++
.../threadpool/manager/ExecutorRepositoryTest.java | 2 +-
.../org/apache/dubbo/config/ReferenceConfig.java | 2 +
.../dubbo/config/bootstrap/DubboBootstrap.java | 75 +++++++++++-----------
.../src/main/resources/META-INF/dubbo.xsd | 23 +++++++
15 files changed, 226 insertions(+), 47 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 14db8b2..61c3fc6 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -416,6 +416,14 @@ public interface CommonConstants {
String DEFAULT_VERSION = "0.0.0";
+ String EXPORT_ASYNC_KEY = "export-async";
+
+ String REFER_ASYNC_KEY = "refer-async";
+
+ String ASYNC_THREAD_NUM_KEY = "async-thread-num";
+
+ int DEFAULT_ASYNC_THREAD_NUM = 10;
+
/**
* Url merge processor key
*/
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index bf1631f..9ef1cd3 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -22,9 +22,13 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.threadpool.ThreadPool;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.config.ConsumerConfig;
+import org.apache.dubbo.config.ProviderConfig;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -34,11 +38,15 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ASYNC_THREAD_NUM;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
+import static org.apache.dubbo.rpc.model.ApplicationModel.getConfigManager;
/**
* Consider implementing {@code Licycle} to enable executors shutdown when the process stops.
@@ -52,7 +60,7 @@ public class DefaultExecutorRepository implements ExecutorRepository {
private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();
- private ScheduledExecutorService serviceExporterExecutor;
+ private volatile ScheduledExecutorService exportReferExecutor;
public ScheduledExecutorService registryNotificationExecutor;
@@ -68,6 +76,8 @@ public class DefaultExecutorRepository implements ExecutorRepository {
private static Ring<ExecutorService> executorServiceRing = new Ring<ExecutorService>();
+ private static final Object LOCK = new Object();
+
public DefaultExecutorRepository() {
for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
@@ -83,7 +93,6 @@ public class DefaultExecutorRepository implements ExecutorRepository {
// reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
poolRouterExecutor = new ThreadPoolExecutor(1, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024),
new NamedInternalThreadFactory("Dubbo-state-router-pool-router", true), new ThreadPoolExecutor.AbortPolicy());
- serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
serviceDiscoveryAddressNotificationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-SD-address-refresh"));
registryNotificationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-registry-notification"));
metadataRetryExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-metadata-retry"));
@@ -118,7 +127,7 @@ public class DefaultExecutorRepository implements ExecutorRepository {
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
- "before coming to here.");
+ "before coming to here.");
return null;
}
@@ -142,7 +151,7 @@ public class DefaultExecutorRepository implements ExecutorRepository {
public void updateThreadpool(URL url, ExecutorService executor) {
try {
if (url.hasParameter(THREADS_KEY)
- && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
+ && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
@@ -177,8 +186,51 @@ public class DefaultExecutorRepository implements ExecutorRepository {
}
@Override
- public ScheduledExecutorService getServiceExporterExecutor() {
- return serviceExporterExecutor;
+ public ScheduledExecutorService getExportReferExecutor() {
+ if (exportReferExecutor == null) {
+ synchronized (LOCK) {
+ if (exportReferExecutor == null) {
+ int coreSize = getExportReferThreadNum();
+ exportReferExecutor = Executors.newScheduledThreadPool(coreSize,
+ new NamedThreadFactory("Dubbo-export-refer", true));
+ }
+ }
+ }
+
+ return exportReferExecutor;
+ }
+
+ public void shutdownExportReferExecutor() {
+ synchronized (LOCK) {
+ if (exportReferExecutor != null && !exportReferExecutor.isShutdown()) {
+ exportReferExecutor.shutdown();
+ }
+
+ exportReferExecutor = null;
+ }
+ }
+
+ private Integer getExportReferThreadNum() {
+ Stream<Integer> provider = getConfigManager().getProviders()
+ .stream()
+ .map(ProviderConfig::getAsyncThreadNum);
+
+ Stream<Integer> consumer = getConfigManager().getConsumers()
+ .stream()
+ .map(ConsumerConfig::getAsyncThreadNum);
+
+ List<Integer> threadNums = Stream.concat(provider, consumer)
+ .filter(k -> k != null && k > 0)
+ .collect(Collectors.toList());
+
+ if (CollectionUtils.isEmpty(threadNums)) {
+ logger.info("Cannot get config `async-thread-num` for export-refer thread, using default: " + DEFAULT_ASYNC_THREAD_NUM);
+ return DEFAULT_ASYNC_THREAD_NUM;
+ } else if (threadNums.size() > 1) {
+ logger.info("Detect multiple config `async-thread-num` for export-refer thread, using: " + threadNums.get(0));
+ }
+
+ return threadNums.get(0);
}
@Override
@@ -212,11 +264,12 @@ public class DefaultExecutorRepository implements ExecutorRepository {
@Override
public void destroyAll() {
poolRouterExecutor.shutdown();
- serviceExporterExecutor.shutdown();
serviceDiscoveryAddressNotificationExecutor.shutdown();
registryNotificationExecutor.shutdown();
metadataRetryExecutor.shutdown();
+ shutdownExportReferExecutor();
+
data.values().forEach(executors -> {
if (executors != null) {
executors.values().forEach(executor -> {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
index 8e4ce87..513f4bf 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -57,7 +57,13 @@ public interface ExecutorRepository {
ExecutorService nextExecutorExecutor();
- ScheduledExecutorService getServiceExporterExecutor();
+ ScheduledExecutorService getExportReferExecutor();
+
+ /**
+ * The executor only used in bootstrap currently, we should call this method to release the resource
+ * after the async export-refer is done.
+ */
+ void shutdownExportReferExecutor();
ScheduledExecutorService getServiceDiscoveryAddressNotificationExecutor();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
index 17edea4..e688a58 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.support.ProtocolUtils;
import static org.apache.dubbo.common.constants.CommonConstants.INVOKER_LISTENER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.REFER_ASYNC_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.STUB_EVENT_KEY;
/**
@@ -80,6 +81,11 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
protected String router;
+ /**
+ * Weather the reference is refer asynchronously
+ */
+ private Boolean referAsync;
+
@Override
protected void checkDefault() {
super.checkDefault();
@@ -231,4 +237,13 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
public void setRouter(String router) {
this.router = router;
}
+
+ @Parameter(key = REFER_ASYNC_KEY)
+ public Boolean getReferAsync() {
+ return referAsync;
+ }
+
+ public void setReferAsync(Boolean referAsync) {
+ this.referAsync = referAsync;
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
index fd9760c..74980f2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List;
import static org.apache.dubbo.common.constants.CommonConstants.EXPORTER_LISTENER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.EXPORT_ASYNC_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
/**
@@ -117,6 +118,11 @@ public abstract class AbstractServiceConfig extends AbstractInterfaceConfig {
*/
private String serialization;
+ /**
+ * Weather the service is export asynchronously
+ */
+ private Boolean exportAsync;
+
@Override
protected void checkDefault() {
super.checkDefault();
@@ -299,4 +305,13 @@ public abstract class AbstractServiceConfig extends AbstractInterfaceConfig {
public void setSerialization(String serialization) {
this.serialization = serialization;
}
+
+ @Parameter(key = EXPORT_ASYNC_KEY)
+ public Boolean getExportAsync() {
+ return exportAsync;
+ }
+
+ public void setExportAsync(Boolean exportAsync) {
+ this.exportAsync = exportAsync;
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
index 69b2b25..137a679 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.config;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.support.Parameter;
+import static org.apache.dubbo.common.constants.CommonConstants.ASYNC_THREAD_NUM_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.URL_MERGE_PROCESSOR_KEY;
/**
@@ -67,6 +68,11 @@ public class ConsumerConfig extends AbstractReferenceConfig {
*/
private String urlMergeProcessor;
+ /**
+ * Thread num for asynchronous export-refer pool size
+ */
+ private Integer asyncThreadNum;
+
@Override
public void setTimeout(Integer timeout) {
super.setTimeout(timeout);
@@ -133,4 +139,13 @@ public class ConsumerConfig extends AbstractReferenceConfig {
public void setUrlMergeProcessor(String urlMergeProcessor) {
this.urlMergeProcessor = urlMergeProcessor;
}
+
+ @Parameter(key = ASYNC_THREAD_NUM_KEY)
+ public Integer getAsyncThreadNum() {
+ return asyncThreadNum;
+ }
+
+ public void setAsyncThreadNum(Integer asyncThreadNum) {
+ this.asyncThreadNum = asyncThreadNum;
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
index 9e0a2dd..f7246e1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
@@ -21,6 +21,8 @@ import org.apache.dubbo.config.support.Parameter;
import java.util.ArrayList;
import java.util.Arrays;
+import static org.apache.dubbo.common.constants.CommonConstants.ASYNC_THREAD_NUM_KEY;
+
/**
* The service provider default configuration
*
@@ -154,6 +156,11 @@ public class ProviderConfig extends AbstractServiceConfig {
*/
private Integer wait;
+ /**
+ * Thread num for asynchronous export-refer pool size
+ */
+ private Integer asyncThreadNum;
+
@Deprecated
public void setProtocol(String protocol) {
this.protocols = new ArrayList<>(Arrays.asList(new ProtocolConfig(protocol)));
@@ -423,6 +430,15 @@ public class ProviderConfig extends AbstractServiceConfig {
this.wait = wait;
}
+ @Parameter(key = ASYNC_THREAD_NUM_KEY)
+ public Integer getAsyncThreadNum() {
+ return asyncThreadNum;
+ }
+
+ public void setAsyncThreadNum(Integer asyncThreadNum) {
+ this.asyncThreadNum = asyncThreadNum;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ProviderConfig{");
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
index 32faad1..c3d2af7 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
@@ -311,6 +311,15 @@ public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
return StringUtils.isEmpty(this.group) ? (consumer != null ? consumer.getGroup() : this.group) : this.group;
}
+ public Boolean shouldReferAsync() {
+ Boolean shouldReferAsync = getReferAsync();
+ if (shouldReferAsync == null) {
+ shouldReferAsync = consumer != null && consumer.getReferAsync() != null && consumer.getReferAsync();
+ }
+
+ return shouldReferAsync;
+ }
+
public abstract T get();
public abstract void destroy();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
index 73359e9..e38eb1e 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
@@ -427,6 +427,15 @@ public abstract class ServiceConfigBase<T> extends AbstractServiceConfig {
super.computeValidRegistryIds();
}
+ public Boolean shouldExportAsync() {
+ Boolean shouldExportAsync = getExportAsync();
+ if (shouldExportAsync == null) {
+ shouldExportAsync = provider != null && provider.getExportAsync() != null && provider.getExportAsync();
+ }
+
+ return shouldExportAsync;
+ }
+
public abstract void export();
public abstract void unexport();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java
index 132e863..cc74ae2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java
@@ -347,4 +347,9 @@ public @interface DubboReference {
* @see org.apache.dubbo.rpc.Constants#SCOPE_REMOTE
*/
String scope() default "";
+
+ /**
+ * Weather the reference is refer asynchronously
+ */
+ boolean referAsync() default false;
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
index ba9fc3f..be665e4 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
@@ -312,4 +312,8 @@ public @interface DubboService {
*/
String scope() default "";
+ /**
+ * Weather the service is export asynchronously
+ */
+ boolean exportAsync() default false;
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
index b19eb37..c7ae0f3 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
@@ -34,7 +34,7 @@ public class ExecutorRepositoryTest {
testGet(URL.valueOf("dubbo://127.0.0.1:23456?side=consumer"));
Assertions.assertNotNull(executorRepository.getSharedExecutor());
- Assertions.assertNotNull(executorRepository.getServiceExporterExecutor());
+ Assertions.assertNotNull(executorRepository.getExportReferExecutor());
executorRepository.nextScheduledExecutor();
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 7b0ae17..6455079 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -200,9 +200,11 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
+
if (ref == null) {
init();
}
+
return ref;
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index dc8e526..f58b290 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -27,7 +27,6 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.lang.ShutdownHookCallbacks;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.threadpool.concurrent.ScheduledCompletableFuture;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ArrayUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
@@ -88,7 +87,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
@@ -159,6 +157,7 @@ public class DubboBootstrap {
private final ExecutorService executorService = newSingleThreadExecutor();
private final ExecutorRepository executorRepository = getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ ;
private final ConfigManager configManager;
@@ -166,10 +165,6 @@ public class DubboBootstrap {
private ReferenceConfigCache cache;
- private volatile boolean exportAsync;
-
- private volatile boolean referAsync;
-
private AtomicBoolean initialized = new AtomicBoolean(false);
private AtomicBoolean started = new AtomicBoolean(false);
@@ -188,9 +183,9 @@ public class DubboBootstrap {
private List<ServiceConfigBase<?>> exportedServices = new ArrayList<>();
- private List<Future<?>> asyncExportingFutures = new ArrayList<>();
+ private final List<CompletableFuture<?>> asyncExportingFutures = new ArrayList<>();
- private List<CompletableFuture<Object>> asyncReferringFutures = new ArrayList<>();
+ private final List<CompletableFuture<?>> asyncReferringFutures = new ArrayList<>();
private static boolean ignoreConfigState;
@@ -544,16 +539,6 @@ public class DubboBootstrap {
return cache;
}
- public DubboBootstrap exportAsync() {
- this.exportAsync = true;
- return this;
- }
-
- public DubboBootstrap referAsync() {
- this.referAsync = true;
- return this;
- }
-
/**
* Initialize
*/
@@ -1108,17 +1093,17 @@ public class DubboBootstrap {
if (!isOnlyRegisterProvider() || hasExportedServices()) {
// 2. export MetadataService
exportMetadataService();
- //3. Register the local ServiceInstance if required
+ // 3. Register the local ServiceInstance if required
registerServiceInstance();
}
referServices();
- if (asyncExportingFutures.size() > 0) {
+ if (asyncExportingFutures.size() > 0 || asyncReferringFutures.size() > 0) {
new Thread(() -> {
try {
this.awaitFinish();
} catch (Exception e) {
- logger.warn(NAME + " exportAsync occurred an exception.");
+ logger.warn(NAME + " asynchronous export / refer occurred an exception.");
}
startup.set(true);
if (logger.isInfoEnabled()) {
@@ -1133,6 +1118,7 @@ public class DubboBootstrap {
}
onStart();
}
+
if (logger.isInfoEnabled()) {
logger.info(NAME + " has started.");
}
@@ -1171,17 +1157,24 @@ public class DubboBootstrap {
}
public DubboBootstrap awaitFinish() throws Exception {
- logger.info(NAME + " waiting services exporting / referring ...");
- if (exportAsync && asyncExportingFutures.size() > 0) {
- CompletableFuture future = CompletableFuture.allOf(asyncExportingFutures.toArray(new CompletableFuture[0]));
+ logger.info(NAME + " waiting services exporting / referring asynchronously...");
+
+ if (asyncExportingFutures.size() > 0) {
+ CompletableFuture<?> future = CompletableFuture.allOf(asyncExportingFutures.toArray(new CompletableFuture[0]));
future.get();
}
- if (referAsync && asyncReferringFutures.size() > 0) {
- CompletableFuture future = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
+
+ if (asyncReferringFutures.size() > 0) {
+ CompletableFuture<?> future = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
future.get();
}
- logger.info("Service export / refer finished.");
+ logger.info("Service asynchronous export / refer finished.");
+
+ // release the resources.
+ logger.info("Shutting down the export-refer executor.");
+ executorRepository.shutdownExportReferExecutor();
+
return this;
}
@@ -1280,22 +1273,23 @@ public class DubboBootstrap {
private void exportServices() {
for (ServiceConfigBase sc : configManager.getServices()) {
// TODO, compatible with ServiceConfig.export()
- ServiceConfig serviceConfig = (ServiceConfig) sc;
+ ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
serviceConfig.setBootstrap(this);
if (!serviceConfig.isRefreshed()) {
serviceConfig.refresh();
}
- if (exportAsync) {
- ExecutorService executor = executorRepository.getServiceExporterExecutor();
- Future<?> future = executor.submit(() -> {
+ if (sc.shouldExportAsync()) {
+ ExecutorService executor = executorRepository.getExportReferExecutor();
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
sc.export();
exportedServices.add(sc);
} catch (Throwable t) {
logger.error("export async catch error : " + t.getMessage(), t);
}
- });
+ }, executor);
+
asyncExportingFutures.add(future);
} else {
sc.export();
@@ -1326,18 +1320,23 @@ public class DubboBootstrap {
configManager.getReferences().forEach(rc -> {
// TODO, compatible with ReferenceConfig.refer()
- ReferenceConfig referenceConfig = (ReferenceConfig) rc;
+ ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
referenceConfig.setBootstrap(this);
if (!referenceConfig.isRefreshed()) {
referenceConfig.refresh();
}
if (rc.shouldInit()) {
- if (referAsync) {
- CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
- executorRepository.getServiceExporterExecutor(),
- () -> cache.get(rc)
- );
+ if (rc.shouldReferAsync()) {
+ ExecutorService executor = executorRepository.getExportReferExecutor();
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ try {
+ cache.get(rc);
+ } catch (Throwable t) {
+ logger.error("refer async catch error : " + t.getMessage(), t);
+ }
+ }, executor);
+
asyncReferringFutures.add(future);
} else {
cache.get(rc);
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 9113480..5d87f8b 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -245,6 +245,12 @@
<![CDATA[ The routers ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="refer-async" type="xsd:boolean">
+ <xsd:annotation>
+ <xsd:documentation>
+ <![CDATA[ Weather the reference is refer asynchronously, default false. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
@@ -336,6 +342,12 @@
<xsd:documentation><![CDATA[ The serialization protocol of service. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="export-async" type="xsd:boolean">
+ <xsd:annotation>
+ <xsd:documentation>
+ <![CDATA[ Weather the service is export asynchronously, default false. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>
@@ -1007,6 +1019,11 @@
<xsd:documentation><![CDATA[ The Url merge processor. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="async-thread-num" type="xsd:integer">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ Thread num for asynchronous export-refer pool size. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>
@@ -1398,6 +1415,12 @@
<xsd:documentation><![CDATA[ Is default. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="async-thread-num" type="xsd:integer">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ Thread num for asynchronous export-refer pool size. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>