You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/13 05:38:11 UTC

[dubbo] branch 3.0 updated: [3.0] Add support for async export/refer (#7931)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3163790  [3.0] Add support for async export/refer (#7931)
3163790 is described below

commit 3163790d013d82d76339b465385023b0a7f0c3a2
Author: Wu Zhiguo <wz...@gmail.com>
AuthorDate: Sun Jun 13 13:37:49 2021 +0800

    [3.0] Add support for async export/refer (#7931)
    
    * add support for async export/refer
    
    * move awaiting logical to another thread
    
    * change log information
    
    * use same executor
    
    * fix ut
    
    * use async as default mode
    
    * Revert "use async as default mode"
    
    This reverts commit ca373ffb
    
    * add support for custom pool size for async export/refer executor
    
    * add shutdown logic for async export/refer executor
    
    * add shutdown logic for async export/refer executor
    
    * getExportReferExecutor private
    
    * move exportReferExecutor to ExecutorRepository
    
    * delete file added unexpectedly
    
    * fix ut
    
    * rollback executorRepository declaration on Bootstrap
    
    * move async to reference/service/consumer/provider
    
    * move constants
    
    * fix ut
    
    * move async pool size config to provider/consumer config
    
    * add shutdown for executor, add spring xsd config
    
    * shutdown
---
 .../dubbo/common/constants/CommonConstants.java    |  8 +++
 .../manager/DefaultExecutorRepository.java         | 67 +++++++++++++++++--
 .../threadpool/manager/ExecutorRepository.java     |  8 ++-
 .../dubbo/config/AbstractReferenceConfig.java      | 15 +++++
 .../apache/dubbo/config/AbstractServiceConfig.java | 15 +++++
 .../org/apache/dubbo/config/ConsumerConfig.java    | 15 +++++
 .../org/apache/dubbo/config/ProviderConfig.java    | 16 +++++
 .../apache/dubbo/config/ReferenceConfigBase.java   |  9 +++
 .../org/apache/dubbo/config/ServiceConfigBase.java |  9 +++
 .../dubbo/config/annotation/DubboReference.java    |  5 ++
 .../dubbo/config/annotation/DubboService.java      |  4 ++
 .../threadpool/manager/ExecutorRepositoryTest.java |  2 +-
 .../org/apache/dubbo/config/ReferenceConfig.java   |  2 +
 .../dubbo/config/bootstrap/DubboBootstrap.java     | 75 +++++++++++-----------
 .../src/main/resources/META-INF/dubbo.xsd          | 23 +++++++
 15 files changed, 226 insertions(+), 47 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 14db8b2..61c3fc6 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -416,6 +416,14 @@ public interface CommonConstants {
 
     String DEFAULT_VERSION = "0.0.0";
 
+    String EXPORT_ASYNC_KEY = "export-async";
+
+    String REFER_ASYNC_KEY = "refer-async";
+
+    String ASYNC_THREAD_NUM_KEY = "async-thread-num";
+
+    int DEFAULT_ASYNC_THREAD_NUM = 10;
+
     /**
      * Url merge processor key
      */
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index bf1631f..9ef1cd3 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -22,9 +22,13 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
 import org.apache.dubbo.common.threadpool.ThreadPool;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.config.ConsumerConfig;
+import org.apache.dubbo.config.ProviderConfig;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -34,11 +38,15 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ASYNC_THREAD_NUM;
 import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
+import static org.apache.dubbo.rpc.model.ApplicationModel.getConfigManager;
 
 /**
  * Consider implementing {@code Licycle} to enable executors shutdown when the process stops.
@@ -52,7 +60,7 @@ public class DefaultExecutorRepository implements ExecutorRepository {
 
     private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();
 
-    private ScheduledExecutorService serviceExporterExecutor;
+    private volatile ScheduledExecutorService exportReferExecutor;
 
     public ScheduledExecutorService registryNotificationExecutor;
 
@@ -68,6 +76,8 @@ public class DefaultExecutorRepository implements ExecutorRepository {
 
     private static Ring<ExecutorService> executorServiceRing = new Ring<ExecutorService>();
 
+    private static final Object LOCK = new Object();
+
     public DefaultExecutorRepository() {
         for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
             ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(
@@ -83,7 +93,6 @@ public class DefaultExecutorRepository implements ExecutorRepository {
 //        reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
         poolRouterExecutor = new ThreadPoolExecutor(1, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024),
             new NamedInternalThreadFactory("Dubbo-state-router-pool-router", true), new ThreadPoolExecutor.AbortPolicy());
-        serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
         serviceDiscoveryAddressNotificationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-SD-address-refresh"));
         registryNotificationExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-registry-notification"));
         metadataRetryExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-metadata-retry"));
@@ -118,7 +127,7 @@ public class DefaultExecutorRepository implements ExecutorRepository {
          */
         if (executors == null) {
             logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
-                    "before coming to here.");
+                "before coming to here.");
             return null;
         }
 
@@ -142,7 +151,7 @@ public class DefaultExecutorRepository implements ExecutorRepository {
     public void updateThreadpool(URL url, ExecutorService executor) {
         try {
             if (url.hasParameter(THREADS_KEY)
-                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
+                && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
                 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                 int threads = url.getParameter(THREADS_KEY, 0);
                 int max = threadPoolExecutor.getMaximumPoolSize();
@@ -177,8 +186,51 @@ public class DefaultExecutorRepository implements ExecutorRepository {
     }
 
     @Override
-    public ScheduledExecutorService getServiceExporterExecutor() {
-        return serviceExporterExecutor;
+    public ScheduledExecutorService getExportReferExecutor() {
+        if (exportReferExecutor == null) {
+            synchronized (LOCK) {
+                if (exportReferExecutor == null) {
+                    int coreSize = getExportReferThreadNum();
+                    exportReferExecutor = Executors.newScheduledThreadPool(coreSize,
+                        new NamedThreadFactory("Dubbo-export-refer", true));
+                }
+            }
+        }
+
+        return exportReferExecutor;
+    }
+
+    public void shutdownExportReferExecutor() {
+        synchronized (LOCK) {
+            if (exportReferExecutor != null && !exportReferExecutor.isShutdown()) {
+                exportReferExecutor.shutdown();
+            }
+
+            exportReferExecutor = null;
+        }
+    }
+
+    private Integer getExportReferThreadNum() {
+        Stream<Integer> provider = getConfigManager().getProviders()
+            .stream()
+            .map(ProviderConfig::getAsyncThreadNum);
+
+        Stream<Integer> consumer = getConfigManager().getConsumers()
+            .stream()
+            .map(ConsumerConfig::getAsyncThreadNum);
+
+        List<Integer> threadNums = Stream.concat(provider, consumer)
+            .filter(k -> k != null && k > 0)
+            .collect(Collectors.toList());
+
+        if (CollectionUtils.isEmpty(threadNums)) {
+            logger.info("Cannot get config `async-thread-num` for export-refer thread, using default: " + DEFAULT_ASYNC_THREAD_NUM);
+            return DEFAULT_ASYNC_THREAD_NUM;
+        } else if (threadNums.size() > 1) {
+            logger.info("Detect multiple config `async-thread-num` for export-refer thread, using: " + threadNums.get(0));
+        }
+
+        return threadNums.get(0);
     }
 
     @Override
@@ -212,11 +264,12 @@ public class DefaultExecutorRepository implements ExecutorRepository {
     @Override
     public void destroyAll() {
         poolRouterExecutor.shutdown();
-        serviceExporterExecutor.shutdown();
         serviceDiscoveryAddressNotificationExecutor.shutdown();
         registryNotificationExecutor.shutdown();
         metadataRetryExecutor.shutdown();
 
+        shutdownExportReferExecutor();
+
         data.values().forEach(executors -> {
             if (executors != null) {
                 executors.values().forEach(executor -> {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
index 8e4ce87..513f4bf 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -57,7 +57,13 @@ public interface ExecutorRepository {
 
     ExecutorService nextExecutorExecutor();
 
-    ScheduledExecutorService getServiceExporterExecutor();
+    ScheduledExecutorService getExportReferExecutor();
+
+    /**
+     * The executor only used in bootstrap currently, we should call this method to release the resource
+     * after the async export-refer is done.
+     */
+    void shutdownExportReferExecutor();
 
     ScheduledExecutorService getServiceDiscoveryAddressNotificationExecutor();
 
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
index 17edea4..e688a58 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractReferenceConfig.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.support.ProtocolUtils;
 
 import static org.apache.dubbo.common.constants.CommonConstants.INVOKER_LISTENER_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.REFER_ASYNC_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.STUB_EVENT_KEY;
 
 /**
@@ -80,6 +81,11 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
 
     protected String router;
 
+    /**
+     * Weather the reference is refer asynchronously
+     */
+    private Boolean referAsync;
+
     @Override
     protected void checkDefault() {
         super.checkDefault();
@@ -231,4 +237,13 @@ public abstract class AbstractReferenceConfig extends AbstractInterfaceConfig {
     public void setRouter(String router) {
         this.router = router;
     }
+
+    @Parameter(key = REFER_ASYNC_KEY)
+    public Boolean getReferAsync() {
+        return referAsync;
+    }
+
+    public void setReferAsync(Boolean referAsync) {
+        this.referAsync = referAsync;
+    }
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
index fd9760c..74980f2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/AbstractServiceConfig.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.dubbo.common.constants.CommonConstants.EXPORTER_LISTENER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.EXPORT_ASYNC_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.SERVICE_FILTER_KEY;
 
 /**
@@ -117,6 +118,11 @@ public abstract class AbstractServiceConfig extends AbstractInterfaceConfig {
      */
     private String serialization;
 
+    /**
+     * Weather the service is export asynchronously
+     */
+    private Boolean exportAsync;
+
     @Override
     protected void checkDefault() {
         super.checkDefault();
@@ -299,4 +305,13 @@ public abstract class AbstractServiceConfig extends AbstractInterfaceConfig {
     public void setSerialization(String serialization) {
         this.serialization = serialization;
     }
+
+    @Parameter(key = EXPORT_ASYNC_KEY)
+    public Boolean getExportAsync() {
+        return exportAsync;
+    }
+
+    public void setExportAsync(Boolean exportAsync) {
+        this.exportAsync = exportAsync;
+    }
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
index 69b2b25..137a679 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.config;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.config.support.Parameter;
 
+import static org.apache.dubbo.common.constants.CommonConstants.ASYNC_THREAD_NUM_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.URL_MERGE_PROCESSOR_KEY;
 
 /**
@@ -67,6 +68,11 @@ public class ConsumerConfig extends AbstractReferenceConfig {
      */
     private String urlMergeProcessor;
 
+    /**
+     * Thread num for asynchronous export-refer pool size
+     */
+    private Integer asyncThreadNum;
+
     @Override
     public void setTimeout(Integer timeout) {
         super.setTimeout(timeout);
@@ -133,4 +139,13 @@ public class ConsumerConfig extends AbstractReferenceConfig {
     public void setUrlMergeProcessor(String urlMergeProcessor) {
         this.urlMergeProcessor = urlMergeProcessor;
     }
+
+    @Parameter(key = ASYNC_THREAD_NUM_KEY)
+    public Integer getAsyncThreadNum() {
+        return asyncThreadNum;
+    }
+
+    public void setAsyncThreadNum(Integer asyncThreadNum) {
+        this.asyncThreadNum = asyncThreadNum;
+    }
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
index 9e0a2dd..f7246e1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
@@ -21,6 +21,8 @@ import org.apache.dubbo.config.support.Parameter;
 import java.util.ArrayList;
 import java.util.Arrays;
 
+import static org.apache.dubbo.common.constants.CommonConstants.ASYNC_THREAD_NUM_KEY;
+
 /**
  * The service provider default configuration
  *
@@ -154,6 +156,11 @@ public class ProviderConfig extends AbstractServiceConfig {
      */
     private Integer wait;
 
+    /**
+     * Thread num for asynchronous export-refer pool size
+     */
+    private Integer asyncThreadNum;
+
     @Deprecated
     public void setProtocol(String protocol) {
         this.protocols = new ArrayList<>(Arrays.asList(new ProtocolConfig(protocol)));
@@ -423,6 +430,15 @@ public class ProviderConfig extends AbstractServiceConfig {
         this.wait = wait;
     }
 
+    @Parameter(key = ASYNC_THREAD_NUM_KEY)
+    public Integer getAsyncThreadNum() {
+        return asyncThreadNum;
+    }
+
+    public void setAsyncThreadNum(Integer asyncThreadNum) {
+        this.asyncThreadNum = asyncThreadNum;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder("ProviderConfig{");
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
index 32faad1..c3d2af7 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
@@ -311,6 +311,15 @@ public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
         return StringUtils.isEmpty(this.group) ? (consumer != null ? consumer.getGroup() : this.group) : this.group;
     }
 
+    public Boolean shouldReferAsync() {
+        Boolean shouldReferAsync = getReferAsync();
+        if (shouldReferAsync == null) {
+            shouldReferAsync = consumer != null && consumer.getReferAsync() != null && consumer.getReferAsync();
+        }
+
+        return shouldReferAsync;
+    }
+
     public abstract T get();
 
     public abstract void destroy();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
index 73359e9..e38eb1e 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
@@ -427,6 +427,15 @@ public abstract class ServiceConfigBase<T> extends AbstractServiceConfig {
         super.computeValidRegistryIds();
     }
 
+    public Boolean shouldExportAsync() {
+        Boolean shouldExportAsync = getExportAsync();
+        if (shouldExportAsync == null) {
+            shouldExportAsync = provider != null && provider.getExportAsync() != null && provider.getExportAsync();
+        }
+
+        return shouldExportAsync;
+    }
+
     public abstract void export();
 
     public abstract void unexport();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java
index 132e863..cc74ae2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboReference.java
@@ -347,4 +347,9 @@ public @interface DubboReference {
      * @see org.apache.dubbo.rpc.Constants#SCOPE_REMOTE
      */
     String scope() default "";
+
+    /**
+     * Weather the reference is refer asynchronously
+     */
+    boolean referAsync() default false;
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
index ba9fc3f..be665e4 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/annotation/DubboService.java
@@ -312,4 +312,8 @@ public @interface DubboService {
      */
     String scope() default "";
 
+    /**
+     * Weather the service is export asynchronously
+     */
+    boolean exportAsync() default false;
 }
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
index b19eb37..c7ae0f3 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
@@ -34,7 +34,7 @@ public class ExecutorRepositoryTest {
         testGet(URL.valueOf("dubbo://127.0.0.1:23456?side=consumer"));
 
         Assertions.assertNotNull(executorRepository.getSharedExecutor());
-        Assertions.assertNotNull(executorRepository.getServiceExporterExecutor());
+        Assertions.assertNotNull(executorRepository.getExportReferExecutor());
         executorRepository.nextScheduledExecutor();
     }
 
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 7b0ae17..6455079 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -200,9 +200,11 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
         if (destroyed) {
             throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
         }
+
         if (ref == null) {
             init();
         }
+
         return ref;
     }
 
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index dc8e526..f58b290 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -27,7 +27,6 @@ import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.lang.ShutdownHookCallbacks;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.threadpool.concurrent.ScheduledCompletableFuture;
 import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
 import org.apache.dubbo.common.utils.ArrayUtils;
 import org.apache.dubbo.common.utils.CollectionUtils;
@@ -88,7 +87,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
@@ -159,6 +157,7 @@ public class DubboBootstrap {
     private final ExecutorService executorService = newSingleThreadExecutor();
 
     private final ExecutorRepository executorRepository = getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+    ;
 
     private final ConfigManager configManager;
 
@@ -166,10 +165,6 @@ public class DubboBootstrap {
 
     private ReferenceConfigCache cache;
 
-    private volatile boolean exportAsync;
-
-    private volatile boolean referAsync;
-
     private AtomicBoolean initialized = new AtomicBoolean(false);
 
     private AtomicBoolean started = new AtomicBoolean(false);
@@ -188,9 +183,9 @@ public class DubboBootstrap {
 
     private List<ServiceConfigBase<?>> exportedServices = new ArrayList<>();
 
-    private List<Future<?>> asyncExportingFutures = new ArrayList<>();
+    private final List<CompletableFuture<?>> asyncExportingFutures = new ArrayList<>();
 
-    private List<CompletableFuture<Object>> asyncReferringFutures = new ArrayList<>();
+    private final List<CompletableFuture<?>> asyncReferringFutures = new ArrayList<>();
 
     private static boolean ignoreConfigState;
 
@@ -544,16 +539,6 @@ public class DubboBootstrap {
         return cache;
     }
 
-    public DubboBootstrap exportAsync() {
-        this.exportAsync = true;
-        return this;
-    }
-
-    public DubboBootstrap referAsync() {
-        this.referAsync = true;
-        return this;
-    }
-
     /**
      * Initialize
      */
@@ -1108,17 +1093,17 @@ public class DubboBootstrap {
             if (!isOnlyRegisterProvider() || hasExportedServices()) {
                 // 2. export MetadataService
                 exportMetadataService();
-                //3. Register the local ServiceInstance if required
+                // 3. Register the local ServiceInstance if required
                 registerServiceInstance();
             }
 
             referServices();
-            if (asyncExportingFutures.size() > 0) {
+            if (asyncExportingFutures.size() > 0 || asyncReferringFutures.size() > 0) {
                 new Thread(() -> {
                     try {
                         this.awaitFinish();
                     } catch (Exception e) {
-                        logger.warn(NAME + " exportAsync occurred an exception.");
+                        logger.warn(NAME + " asynchronous export / refer occurred an exception.");
                     }
                     startup.set(true);
                     if (logger.isInfoEnabled()) {
@@ -1133,6 +1118,7 @@ public class DubboBootstrap {
                 }
                 onStart();
             }
+
             if (logger.isInfoEnabled()) {
                 logger.info(NAME + " has started.");
             }
@@ -1171,17 +1157,24 @@ public class DubboBootstrap {
     }
 
     public DubboBootstrap awaitFinish() throws Exception {
-        logger.info(NAME + " waiting services exporting / referring ...");
-        if (exportAsync && asyncExportingFutures.size() > 0) {
-            CompletableFuture future = CompletableFuture.allOf(asyncExportingFutures.toArray(new CompletableFuture[0]));
+        logger.info(NAME + " waiting services exporting / referring asynchronously...");
+
+        if (asyncExportingFutures.size() > 0) {
+            CompletableFuture<?> future = CompletableFuture.allOf(asyncExportingFutures.toArray(new CompletableFuture[0]));
             future.get();
         }
-        if (referAsync && asyncReferringFutures.size() > 0) {
-            CompletableFuture future = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
+
+        if (asyncReferringFutures.size() > 0) {
+            CompletableFuture<?> future = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
             future.get();
         }
 
-        logger.info("Service export / refer finished.");
+        logger.info("Service asynchronous export / refer finished.");
+
+        // release the resources.
+        logger.info("Shutting down the export-refer executor.");
+        executorRepository.shutdownExportReferExecutor();
+
         return this;
     }
 
@@ -1280,22 +1273,23 @@ public class DubboBootstrap {
     private void exportServices() {
         for (ServiceConfigBase sc : configManager.getServices()) {
             // TODO, compatible with ServiceConfig.export()
-            ServiceConfig serviceConfig = (ServiceConfig) sc;
+            ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
             serviceConfig.setBootstrap(this);
             if (!serviceConfig.isRefreshed()) {
                 serviceConfig.refresh();
             }
 
-            if (exportAsync) {
-                ExecutorService executor = executorRepository.getServiceExporterExecutor();
-                Future<?> future = executor.submit(() -> {
+            if (sc.shouldExportAsync()) {
+                ExecutorService executor = executorRepository.getExportReferExecutor();
+                CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                     try {
                         sc.export();
                         exportedServices.add(sc);
                     } catch (Throwable t) {
                         logger.error("export async catch error : " + t.getMessage(), t);
                     }
-                });
+                }, executor);
+
                 asyncExportingFutures.add(future);
             } else {
                 sc.export();
@@ -1326,18 +1320,23 @@ public class DubboBootstrap {
 
         configManager.getReferences().forEach(rc -> {
             // TODO, compatible with  ReferenceConfig.refer()
-            ReferenceConfig referenceConfig = (ReferenceConfig) rc;
+            ReferenceConfig<?> referenceConfig = (ReferenceConfig<?>) rc;
             referenceConfig.setBootstrap(this);
             if (!referenceConfig.isRefreshed()) {
                 referenceConfig.refresh();
             }
 
             if (rc.shouldInit()) {
-                if (referAsync) {
-                    CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
-                        executorRepository.getServiceExporterExecutor(),
-                        () -> cache.get(rc)
-                    );
+                if (rc.shouldReferAsync()) {
+                    ExecutorService executor = executorRepository.getExportReferExecutor();
+                    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                        try {
+                            cache.get(rc);
+                        } catch (Throwable t) {
+                            logger.error("refer async catch error : " + t.getMessage(), t);
+                        }
+                    }, executor);
+
                     asyncReferringFutures.add(future);
                 } else {
                     cache.get(rc);
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 9113480..5d87f8b 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -245,6 +245,12 @@
                             <![CDATA[ The routers ]]></xsd:documentation>
                     </xsd:annotation>
                 </xsd:attribute>
+                <xsd:attribute name="refer-async" type="xsd:boolean">
+                    <xsd:annotation>
+                        <xsd:documentation>
+                            <![CDATA[ Weather the reference is refer asynchronously, default false. ]]></xsd:documentation>
+                    </xsd:annotation>
+                </xsd:attribute>
             </xsd:extension>
         </xsd:complexContent>
     </xsd:complexType>
@@ -336,6 +342,12 @@
                         <xsd:documentation><![CDATA[ The serialization protocol of service. ]]></xsd:documentation>
                     </xsd:annotation>
                 </xsd:attribute>
+                <xsd:attribute name="export-async" type="xsd:boolean">
+                    <xsd:annotation>
+                        <xsd:documentation>
+                            <![CDATA[ Weather the service is export asynchronously, default false. ]]></xsd:documentation>
+                    </xsd:annotation>
+                </xsd:attribute>
                 <xsd:anyAttribute namespace="##other" processContents="lax"/>
             </xsd:extension>
         </xsd:complexContent>
@@ -1007,6 +1019,11 @@
                         <xsd:documentation><![CDATA[ The Url merge processor. ]]></xsd:documentation>
                     </xsd:annotation>
                 </xsd:attribute>
+                <xsd:attribute name="async-thread-num" type="xsd:integer">
+                    <xsd:annotation>
+                        <xsd:documentation><![CDATA[ Thread num for asynchronous export-refer pool size. ]]></xsd:documentation>
+                    </xsd:annotation>
+                </xsd:attribute>
                 <xsd:anyAttribute namespace="##other" processContents="lax"/>
             </xsd:extension>
         </xsd:complexContent>
@@ -1398,6 +1415,12 @@
                         <xsd:documentation><![CDATA[ Is default. ]]></xsd:documentation>
                     </xsd:annotation>
                 </xsd:attribute>
+                <xsd:attribute name="async-thread-num" type="xsd:integer">
+                    <xsd:annotation>
+                        <xsd:documentation><![CDATA[ Thread num for asynchronous export-refer pool size. ]]></xsd:documentation>
+                    </xsd:annotation>
+                </xsd:attribute>
+
                 <xsd:anyAttribute namespace="##other" processContents="lax"/>
             </xsd:extension>
         </xsd:complexContent>