You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/02/22 06:01:34 UTC
[dubbo] branch master updated: fix the bug of broadcast cluster
invoke (#7174)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new c24ccf0 fix the bug of broadcast cluster invoke (#7174)
c24ccf0 is described below
commit c24ccf0d1c51a035eb3680d59894195a935b0390
Author: xiaoheng1 <20...@qq.com>
AuthorDate: Mon Feb 22 14:01:24 2021 +0800
fix the bug of broadcast cluster invoke (#7174)
---
.../cluster/support/BroadcastClusterInvoker.java | 62 +++++++++++++++++++---
1 file changed, 56 insertions(+), 6 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java
index ab0623e..a2b7cad 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.rpc.cluster.support;
+import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Invocation;
@@ -26,15 +27,18 @@ import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
+import java.text.Format;
import java.util.List;
/**
* BroadcastClusterInvoker
- *
*/
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
+ private static final String BROADCAST_FAIL_PERCENT_KEY = "broadcast.fail.percent";
+ private static final int MAX_BROADCAST_FAIL_PERCENT = 100;
+ private static final int MIN_BROADCAST_FAIL_PERCENT = 0;
public BroadcastClusterInvoker(Directory<T> directory) {
super(directory);
@@ -47,21 +51,67 @@ public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
+ URL url = getUrl();
+ // The value range of broadcast.fail.threshold must be 0~100.
+ // 100 means that an exception will be thrown last, and 0 means that as long as an exception occurs, it will be thrown.
+ // see https://github.com/apache/dubbo/pull/7174
+ int broadcastFailPercent = url.getParameter(BROADCAST_FAIL_PERCENT_KEY, MAX_BROADCAST_FAIL_PERCENT);
+
+ if (broadcastFailPercent < MIN_BROADCAST_FAIL_PERCENT || broadcastFailPercent > MAX_BROADCAST_FAIL_PERCENT) {
+ logger.info(String.format("The value corresponding to the broadcast.fail.percent parameter must be between 0 and 100. " +
+ "The current setting is %s, which is reset to 100.", broadcastFailPercent));
+ broadcastFailPercent = MAX_BROADCAST_FAIL_PERCENT;
+ }
+
+ int failThresholdIndex = invokers.size() * broadcastFailPercent / MAX_BROADCAST_FAIL_PERCENT;
+ int failIndex = 0;
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
- } catch (RpcException e) {
- exception = e;
- logger.warn(e.getMessage(), e);
+ if (null != result && result.hasException()) {
+ Throwable resultException = result.getException();
+ if (null != resultException) {
+ exception = getRpcException(result.getException());
+ logger.warn(exception.getMessage(), exception);
+ if (failIndex == failThresholdIndex) {
+ break;
+ } else {
+ failIndex++;
+ }
+ }
+ }
} catch (Throwable e) {
- exception = new RpcException(e.getMessage(), e);
- logger.warn(e.getMessage(), e);
+ exception = getRpcException(e);
+ logger.warn(exception.getMessage(), exception);
+ if (failIndex == failThresholdIndex) {
+ break;
+ } else {
+ failIndex++;
+ }
}
}
+
if (exception != null) {
+ if (failIndex == failThresholdIndex) {
+ logger.debug(
+ String.format("The number of BroadcastCluster call failures has reached the threshold %s", failThresholdIndex));
+ } else {
+ logger.debug(String.format("The number of BroadcastCluster call failures has not reached the threshold %s, fail size is %s",
+ failIndex));
+ }
throw exception;
}
+
return result;
}
+ private RpcException getRpcException(Throwable throwable) {
+ RpcException rpcException = null;
+ if (throwable instanceof RpcException) {
+ rpcException = (RpcException) throwable;
+ } else {
+ rpcException = new RpcException(throwable.getMessage(), throwable);
+ }
+ return rpcException;
+ }
}