You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/07/19 04:44:56 UTC
[dubbo] branch 3.0 updated: [3.0] Improve async export / refer
(#8186)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new f761f32 [3.0] Improve async export / refer (#8186)
f761f32 is described below
commit f761f3251923b144a5f471350bbe431911b921f7
Author: Wu Zhiguo <ch...@startdt.com>
AuthorDate: Mon Jul 19 12:44:13 2021 +0800
[3.0] Improve async export / refer (#8186)
* separate export and refer executor
* add export / refer background
* default thread num 10
* start a thread only when export/refer background
---
.../dubbo/common/constants/CommonConstants.java | 12 ++-
.../manager/DefaultExecutorRepository.java | 97 ++++++++++++++-------
.../threadpool/manager/ExecutorRepository.java | 14 +++-
.../org/apache/dubbo/config/ConsumerConfig.java | 31 +++++--
.../org/apache/dubbo/config/ProviderConfig.java | 31 +++++--
.../threadpool/manager/ExecutorRepositoryTest.java | 3 +-
.../dubbo/config/bootstrap/DubboBootstrap.java | 98 ++++++++++++++++++----
.../src/main/resources/META-INF/dubbo.xsd | 19 +++--
8 files changed, 234 insertions(+), 71 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index cdd7fc0..b3e7fec 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -422,9 +422,17 @@ public interface CommonConstants {
String REFER_ASYNC_KEY = "refer-async";
- String ASYNC_THREAD_NUM_KEY = "async-thread-num";
+ String EXPORT_BACKGROUND_KEY = "export-background";
- int DEFAULT_ASYNC_THREAD_NUM = 10;
+ String REFER_BACKGROUND_KEY = "refer-background";
+
+ String EXPORT_THREAD_NUM_KEY = "export-thread-num";
+
+ String REFER_THREAD_NUM_KEY = "refer-thread-num";
+
+ int DEFAULT_EXPORT_THREAD_NUM = 10;
+
+ int DEFAULT_REFER_THREAD_NUM = 10;
/**
* Url merge processor key
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index cc76257..fff55da 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -39,10 +39,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
-import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ASYNC_THREAD_NUM;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_EXPORT_THREAD_NUM;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_REFER_THREAD_NUM;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADS_KEY;
@@ -60,7 +60,9 @@ public class DefaultExecutorRepository implements ExecutorRepository {
private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();
- private volatile ScheduledExecutorService exportReferExecutor;
+ private volatile ExecutorService serviceExportExecutor;
+
+ private volatile ExecutorService serviceReferExecutor;
private ScheduledExecutorService reconnectScheduledExecutor;
@@ -195,51 +197,89 @@ public class DefaultExecutorRepository implements ExecutorRepository {
}
@Override
- public ScheduledExecutorService getExportReferExecutor() {
- if (exportReferExecutor == null) {
+ public ExecutorService getServiceExportExecutor() {
+ if (serviceExportExecutor == null) {
synchronized (LOCK) {
- if (exportReferExecutor == null) {
- int coreSize = getExportReferThreadNum();
- exportReferExecutor = Executors.newScheduledThreadPool(coreSize,
- new NamedThreadFactory("Dubbo-export-refer", true));
+ if (serviceExportExecutor == null) {
+ int coreSize = getExportThreadNum();
+ serviceExportExecutor = Executors.newFixedThreadPool(coreSize,
+ new NamedThreadFactory("Dubbo-service-export", true));
}
}
}
- return exportReferExecutor;
+ return serviceExportExecutor;
}
- public void shutdownExportReferExecutor() {
+ @Override
+ public void shutdownServiceExportExecutor() {
synchronized (LOCK) {
- if (exportReferExecutor != null && !exportReferExecutor.isShutdown()) {
- exportReferExecutor.shutdown();
+ if (serviceExportExecutor != null && !serviceExportExecutor.isShutdown()) {
+ serviceExportExecutor.shutdown();
}
- exportReferExecutor = null;
+ serviceExportExecutor = null;
}
}
- private Integer getExportReferThreadNum() {
- Stream<Integer> provider = getConfigManager().getProviders()
+ private Integer getExportThreadNum() {
+ List<Integer> threadNum = getConfigManager().getProviders()
.stream()
- .map(ProviderConfig::getAsyncThreadNum);
+ .map(ProviderConfig::getExportThreadNum)
+ .filter(k -> k != null && k > 0)
+ .collect(Collectors.toList());
- Stream<Integer> consumer = getConfigManager().getConsumers()
- .stream()
- .map(ConsumerConfig::getAsyncThreadNum);
+ if (CollectionUtils.isEmpty(threadNum)) {
+ logger.info("Cannot get config `export-thread-num` for service export thread, using default: " + DEFAULT_EXPORT_THREAD_NUM);
+ return DEFAULT_EXPORT_THREAD_NUM;
+ } else if (threadNum.size() > 1) {
+ logger.info("Detect multiple config `export-thread-num` for service export thread, using: " + threadNum.get(0));
+ }
+
+ return threadNum.get(0);
+ }
+
+ @Override
+ public ExecutorService getServiceReferExecutor() {
+ if (serviceReferExecutor == null) {
+ synchronized (LOCK) {
+ if (serviceReferExecutor == null) {
+ int coreSize = getReferThreadNum();
+ serviceReferExecutor = Executors.newFixedThreadPool(coreSize,
+ new NamedThreadFactory("Dubbo-service-refer", true));
+ }
+ }
+ }
+
+ return serviceReferExecutor;
+ }
+
+ @Override
+ public void shutdownServiceReferExecutor() {
+ synchronized (LOCK) {
+ if (serviceReferExecutor != null && !serviceReferExecutor.isShutdown()) {
+ serviceReferExecutor.shutdown();
+ }
+
+ serviceReferExecutor = null;
+ }
+ }
- List<Integer> threadNums = Stream.concat(provider, consumer)
+ private Integer getReferThreadNum() {
+ List<Integer> threadNum = getConfigManager().getConsumers()
+ .stream()
+ .map(ConsumerConfig::getReferThreadNum)
.filter(k -> k != null && k > 0)
.collect(Collectors.toList());
- if (CollectionUtils.isEmpty(threadNums)) {
- logger.info("Cannot get config `async-thread-num` for export-refer thread, using default: " + DEFAULT_ASYNC_THREAD_NUM);
- return DEFAULT_ASYNC_THREAD_NUM;
- } else if (threadNums.size() > 1) {
- logger.info("Detect multiple config `async-thread-num` for export-refer thread, using: " + threadNums.get(0));
+ if (CollectionUtils.isEmpty(threadNum)) {
+ logger.info("Cannot get config `refer-thread-num` for service refer thread, using default: " + DEFAULT_REFER_THREAD_NUM);
+ return DEFAULT_REFER_THREAD_NUM;
+ } else if (threadNum.size() > 1) {
+ logger.info("Detect multiple config `refer-thread-num` for service refer thread, using: " + threadNum.get(0));
}
- return threadNums.get(0);
+ return threadNum.get(0);
}
@Override
@@ -277,7 +317,8 @@ public class DefaultExecutorRepository implements ExecutorRepository {
// registryNotificationExecutor.shutdown();
metadataRetryExecutor.shutdown();
- shutdownExportReferExecutor();
+ shutdownServiceExportExecutor();
+ shutdownServiceReferExecutor();
data.values().forEach(executors -> {
if (executors != null) {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
index 513f4bf..b209f48 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepository.java
@@ -57,13 +57,21 @@ public interface ExecutorRepository {
ExecutorService nextExecutorExecutor();
- ScheduledExecutorService getExportReferExecutor();
+ ExecutorService getServiceExportExecutor();
/**
* The executor only used in bootstrap currently, we should call this method to release the resource
- * after the async export-refer is done.
+ * after the async export is done.
*/
- void shutdownExportReferExecutor();
+ void shutdownServiceExportExecutor();
+
+ ExecutorService getServiceReferExecutor();
+
+ /**
+ * The executor only used in bootstrap currently, we should call this method to release the resource
+ * after the async refer is done.
+ */
+ void shutdownServiceReferExecutor();
ScheduledExecutorService getServiceDiscoveryAddressNotificationExecutor();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
index 137a679..cbc3aba 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ConsumerConfig.java
@@ -19,7 +19,8 @@ package org.apache.dubbo.config;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.support.Parameter;
-import static org.apache.dubbo.common.constants.CommonConstants.ASYNC_THREAD_NUM_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.REFER_BACKGROUND_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.REFER_THREAD_NUM_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.URL_MERGE_PROCESSOR_KEY;
/**
@@ -69,9 +70,14 @@ public class ConsumerConfig extends AbstractReferenceConfig {
private String urlMergeProcessor;
/**
- * Thread num for asynchronous export-refer pool size
+ * Thread num for asynchronous refer pool size
*/
- private Integer asyncThreadNum;
+ private Integer referThreadNum;
+
+ /**
+ * Whether refer should run in background or not
+ */
+ private Boolean referBackground;
@Override
public void setTimeout(Integer timeout) {
@@ -140,12 +146,21 @@ public class ConsumerConfig extends AbstractReferenceConfig {
this.urlMergeProcessor = urlMergeProcessor;
}
- @Parameter(key = ASYNC_THREAD_NUM_KEY)
- public Integer getAsyncThreadNum() {
- return asyncThreadNum;
+ @Parameter(key = REFER_THREAD_NUM_KEY, excluded = true)
+ public Integer getReferThreadNum() {
+ return referThreadNum;
+ }
+
+ public void setReferThreadNum(Integer referThreadNum) {
+ this.referThreadNum = referThreadNum;
+ }
+
+ @Parameter(key = REFER_BACKGROUND_KEY, excluded = true)
+ public Boolean getReferBackground() {
+ return referBackground;
}
- public void setAsyncThreadNum(Integer asyncThreadNum) {
- this.asyncThreadNum = asyncThreadNum;
+ public void setReferBackground(Boolean referBackground) {
+ this.referBackground = referBackground;
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
index 8ebe6e5..83b7cb3 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ProviderConfig.java
@@ -21,7 +21,8 @@ import org.apache.dubbo.config.support.Parameter;
import java.util.ArrayList;
import java.util.Arrays;
-import static org.apache.dubbo.common.constants.CommonConstants.ASYNC_THREAD_NUM_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.EXPORT_BACKGROUND_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.EXPORT_THREAD_NUM_KEY;
/**
* The service provider default configuration
@@ -157,9 +158,14 @@ public class ProviderConfig extends AbstractServiceConfig {
private Integer wait;
/**
- * Thread num for asynchronous export-refer pool size
+ * Thread num for asynchronous export pool size
*/
- private Integer asyncThreadNum;
+ private Integer exportThreadNum;
+
+ /**
+ * Whether export should run in background or not
+ */
+ private Boolean exportBackground;
@Deprecated
public void setProtocol(String protocol) {
@@ -430,13 +436,22 @@ public class ProviderConfig extends AbstractServiceConfig {
this.wait = wait;
}
- @Parameter(key = ASYNC_THREAD_NUM_KEY)
- public Integer getAsyncThreadNum() {
- return asyncThreadNum;
+ @Parameter(key = EXPORT_THREAD_NUM_KEY, excluded = true)
+ public Integer getExportThreadNum() {
+ return exportThreadNum;
+ }
+
+ public void setExportThreadNum(Integer exportThreadNum) {
+ this.exportThreadNum = exportThreadNum;
+ }
+
+ @Parameter(key = EXPORT_BACKGROUND_KEY, excluded = true)
+ public Boolean getExportBackground() {
+ return exportBackground;
}
- public void setAsyncThreadNum(Integer asyncThreadNum) {
- this.asyncThreadNum = asyncThreadNum;
+ public void setExportBackground(Boolean exportBackground) {
+ this.exportBackground = exportBackground;
}
@Override
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
index c7ae0f3..193f7f4 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
+++ b/dubbo-common/src/test/java/org/apache/dubbo/common/threadpool/manager/ExecutorRepositoryTest.java
@@ -34,7 +34,8 @@ public class ExecutorRepositoryTest {
testGet(URL.valueOf("dubbo://127.0.0.1:23456?side=consumer"));
Assertions.assertNotNull(executorRepository.getSharedExecutor());
- Assertions.assertNotNull(executorRepository.getExportReferExecutor());
+ Assertions.assertNotNull(executorRepository.getServiceExportExecutor());
+ Assertions.assertNotNull(executorRepository.getServiceReferExecutor());
executorRepository.nextScheduledExecutor();
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index 2d7e599..7f371fa 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -94,6 +94,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
@@ -189,6 +190,10 @@ public class DubboBootstrap {
private final List<CompletableFuture<?>> asyncReferringFutures = new ArrayList<>();
+ private boolean asyncExportFinish = true;
+
+ private boolean asyncReferFinish = true;
+
private static boolean ignoreConfigState;
/**
@@ -1103,13 +1108,20 @@ public class DubboBootstrap {
}
referServices();
- if (asyncExportingFutures.size() > 0 || asyncReferringFutures.size() > 0) {
+
+ // wait async export / refer finish if needed
+ awaitFinish();
+
+ if (isExportBackground() || isReferBackground()) {
new Thread(() -> {
- try {
- this.awaitFinish();
- } catch (Exception e) {
- logger.warn(NAME + " asynchronous export / refer occurred an exception.");
+ while (!asyncExportFinish || !asyncReferFinish) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.error(NAME + " waiting async export / refer occurred and error.", e);
+ }
}
+
startup.set(true);
if (logger.isInfoEnabled()) {
logger.info(NAME + " is ready.");
@@ -1161,25 +1173,79 @@ public class DubboBootstrap {
return this;
}
- public DubboBootstrap awaitFinish() throws Exception {
- logger.info(NAME + " waiting services exporting / referring asynchronously...");
-
+ private void waitAsyncExportIfNeeded() {
if (asyncExportingFutures.size() > 0) {
+ asyncExportFinish = false;
+ if (isExportBackground()) {
+ new Thread(this::waitExportFinish).start();
+ } else {
+ waitExportFinish();
+ }
+ }
+ }
+
+ private boolean isExportBackground() {
+ List<Boolean> list = configManager.getProviders()
+ .stream()
+ .map(ProviderConfig::getExportBackground)
+ .filter(k -> k != null && k)
+ .collect(Collectors.toList());
+
+ return CollectionUtils.isNotEmpty(list);
+ }
+
+ private void waitExportFinish() {
+ try {
+ logger.info(NAME + " waiting services exporting asynchronously...");
CompletableFuture<?> future = CompletableFuture.allOf(asyncExportingFutures.toArray(new CompletableFuture[0]));
future.get();
+ } catch (Exception e) {
+ logger.warn(NAME + " asynchronous export occurred an exception.");
+ } finally {
+ executorRepository.shutdownServiceExportExecutor();
+ logger.info(NAME + " asynchronous export finished.");
+ asyncExportFinish = true;
}
+ }
+ private void waitAsyncReferIfNeeded() {
if (asyncReferringFutures.size() > 0) {
- CompletableFuture<?> future = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
- future.get();
+ asyncReferFinish = false;
+ if (isReferBackground()) {
+ new Thread(this::waitReferFinish).start();
+ } else {
+ waitReferFinish();
+ }
}
+ }
- logger.info("Service asynchronous export / refer finished.");
+ private boolean isReferBackground() {
+ List<Boolean> list = configManager.getConsumers()
+ .stream()
+ .map(ConsumerConfig::getReferBackground)
+ .filter(k -> k != null && k)
+ .collect(Collectors.toList());
- // release the resources.
- logger.info("Shutting down the export-refer executor.");
- executorRepository.shutdownExportReferExecutor();
+ return CollectionUtils.isNotEmpty(list);
+ }
+
+ private void waitReferFinish() {
+ try {
+ logger.info(NAME + " waiting services referring asynchronously...");
+ CompletableFuture<?> future = CompletableFuture.allOf(asyncReferringFutures.toArray(new CompletableFuture[0]));
+ future.get();
+ } catch (Exception e) {
+ logger.warn(NAME + " asynchronous refer occurred an exception.");
+ } finally {
+ executorRepository.shutdownServiceExportExecutor();
+ logger.info(NAME + " asynchronous refer finished.");
+ asyncReferFinish = true;
+ }
+ }
+ public DubboBootstrap awaitFinish() {
+ waitAsyncExportIfNeeded();
+ waitAsyncReferIfNeeded();
return this;
}
@@ -1298,7 +1364,7 @@ public class DubboBootstrap {
}
if (sc.shouldExportAsync()) {
- ExecutorService executor = executorRepository.getExportReferExecutor();
+ ExecutorService executor = executorRepository.getServiceExportExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
if (!sc.isExported()) {
@@ -1350,7 +1416,7 @@ public class DubboBootstrap {
if (rc.shouldInit()) {
if (rc.shouldReferAsync()) {
- ExecutorService executor = executorRepository.getExportReferExecutor();
+ ExecutorService executor = executorRepository.getServiceReferExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
cache.get(rc);
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index e38e059..7f9ed80 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -1029,9 +1029,14 @@
<xsd:documentation><![CDATA[ The Url merge processor. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
- <xsd:attribute name="async-thread-num" type="xsd:integer">
+ <xsd:attribute name="refer-thread-num" type="xsd:integer">
<xsd:annotation>
- <xsd:documentation><![CDATA[ Thread num for asynchronous export-refer pool size. ]]></xsd:documentation>
+ <xsd:documentation><![CDATA[ Thread num for asynchronous refer pool size. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="refer-background" type="xsd:boolean">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ Whether refer should run in background or not, default false. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax"/>
@@ -1425,12 +1430,16 @@
<xsd:documentation><![CDATA[ Is default. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
- <xsd:attribute name="async-thread-num" type="xsd:integer">
+ <xsd:attribute name="export-thread-num" type="xsd:integer">
<xsd:annotation>
- <xsd:documentation><![CDATA[ Thread num for asynchronous export-refer pool size. ]]></xsd:documentation>
+ <xsd:documentation><![CDATA[ Thread num for asynchronous export pool size. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
+ <xsd:attribute name="export-background" type="xsd:boolean">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ Whether export should run in background or not, default false. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
-
<xsd:anyAttribute namespace="##other" processContents="lax"/>
</xsd:extension>
</xsd:complexContent>