You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/02/22 06:01:34 UTC

[dubbo] branch master updated: fix the bug of broadcast cluster invoke (#7174)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new c24ccf0  fix the bug of broadcast cluster invoke (#7174)
c24ccf0 is described below

commit c24ccf0d1c51a035eb3680d59894195a935b0390
Author: xiaoheng1 <20...@qq.com>
AuthorDate: Mon Feb 22 14:01:24 2021 +0800

    fix the bug of broadcast cluster invoke (#7174)
---
 .../cluster/support/BroadcastClusterInvoker.java   | 62 +++++++++++++++++++---
 1 file changed, 56 insertions(+), 6 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java
index ab0623e..a2b7cad 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/BroadcastClusterInvoker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.dubbo.rpc.cluster.support;
 
+import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.rpc.Invocation;
@@ -26,15 +27,18 @@ import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.cluster.Directory;
 import org.apache.dubbo.rpc.cluster.LoadBalance;
 
+import java.text.Format;
 import java.util.List;
 
 /**
  * BroadcastClusterInvoker
- *
  */
 public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
 
     private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
+    private static final String BROADCAST_FAIL_PERCENT_KEY = "broadcast.fail.percent";
+    private static final int MAX_BROADCAST_FAIL_PERCENT = 100;
+    private static final int MIN_BROADCAST_FAIL_PERCENT = 0;
 
     public BroadcastClusterInvoker(Directory<T> directory) {
         super(directory);
@@ -47,21 +51,67 @@ public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
         RpcContext.getContext().setInvokers((List) invokers);
         RpcException exception = null;
         Result result = null;
+        URL url = getUrl();
+        // The value range of broadcast.fail.threshold must be 0~100.
+        // 100 means that an exception will be thrown last, and 0 means that as long as an exception occurs, it will be thrown.
+        // see https://github.com/apache/dubbo/pull/7174
+        int broadcastFailPercent = url.getParameter(BROADCAST_FAIL_PERCENT_KEY, MAX_BROADCAST_FAIL_PERCENT);
+
+        if (broadcastFailPercent < MIN_BROADCAST_FAIL_PERCENT || broadcastFailPercent > MAX_BROADCAST_FAIL_PERCENT) {
+            logger.info(String.format("The value corresponding to the broadcast.fail.percent parameter must be between 0 and 100. " +
+                    "The current setting is %s, which is reset to 100.", broadcastFailPercent));
+            broadcastFailPercent = MAX_BROADCAST_FAIL_PERCENT;
+        }
+
+        int failThresholdIndex = invokers.size() * broadcastFailPercent / MAX_BROADCAST_FAIL_PERCENT;
+        int failIndex = 0;
         for (Invoker<T> invoker : invokers) {
             try {
                 result = invoker.invoke(invocation);
-            } catch (RpcException e) {
-                exception = e;
-                logger.warn(e.getMessage(), e);
+                if (null != result && result.hasException()) {
+                    Throwable resultException = result.getException();
+                    if (null != resultException) {
+                        exception = getRpcException(result.getException());
+                        logger.warn(exception.getMessage(), exception);
+                        if (failIndex == failThresholdIndex) {
+                            break;
+                        } else {
+                            failIndex++;
+                        }
+                    }
+                }
             } catch (Throwable e) {
-                exception = new RpcException(e.getMessage(), e);
-                logger.warn(e.getMessage(), e);
+                exception = getRpcException(e);
+                logger.warn(exception.getMessage(), exception);
+                if (failIndex == failThresholdIndex) {
+                    break;
+                } else {
+                    failIndex++;
+                }
             }
         }
+
         if (exception != null) {
+            if (failIndex == failThresholdIndex) {
+                logger.debug(
+                        String.format("The number of BroadcastCluster call failures has reached the threshold %s", failThresholdIndex));
+            } else {
+                logger.debug(String.format("The number of BroadcastCluster call failures has not reached the threshold %s, fail size is %s",
+                        failIndex));
+            }
             throw exception;
         }
+
         return result;
     }
 
+    private RpcException getRpcException(Throwable throwable) {
+        RpcException rpcException = null;
+        if (throwable instanceof RpcException) {
+            rpcException = (RpcException) throwable;
+        } else {
+            rpcException = new RpcException(throwable.getMessage(), throwable);
+        }
+        return rpcException;
+    }
 }