You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/07/31 05:13:53 UTC

[incubator-dubbo] branch master updated: Merge pull request #2125, binding attachments into invocation so that a rpc invoke have a same invocation.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 971d80d  Merge pull request #2125, binding attachments into invocation so that a rpc invoke have a same invocation.
971d80d is described below

commit 971d80d55da692690135dead98f9ce2001023e60
Author: 时无两丶 <44...@qq.com>
AuthorDate: Tue Jul 31 13:13:51 2018 +0800

    Merge pull request #2125, binding attachments into invocation so that a rpc invoke have a same invocation.
    
    Fixes #1978
---
 .../cluster/support/AbstractClusterInvoker.java    | 15 +++-
 .../rpc/cluster/support/ForkingClusterInvoker.java | 82 ++++++++++++----------
 .../support/AbstractClusterInvokerTest.java        | 27 +++++++
 .../support/FailoverClusterInvokerTest.java        |  2 +-
 .../cluster/support/ForkingClusterInvokerTest.java | 32 ++++++++-
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |  2 +-
 6 files changed, 113 insertions(+), 47 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index a4a1905..2e3ab14 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -25,6 +25,8 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.cluster.Directory;
@@ -33,11 +35,11 @@ import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * AbstractClusterInvoker
- *
  */
 public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
 
@@ -92,10 +94,10 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
 
     /**
      * Select a invoker using loadbalance policy.</br>
-     * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or, 
+     * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
      * if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
      * b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that
-     * the selected invoker has the minimum chance to be one in the previously selected list, and also 
+     * the selected invoker has the minimum chance to be one in the previously selected list, and also
      * guarantees this invoker is available.
      *
      * @param loadbalance load balance policy
@@ -225,6 +227,13 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
     public Result invoke(final Invocation invocation) throws RpcException {
         checkWhetherDestroyed();
         LoadBalance loadbalance = null;
+
+        // binding attachments into invocation.
+        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
+        if (contextAttachments != null && contextAttachments.size() != 0) {
+            ((RpcInvocation) invocation).addAttachments(contextAttachments);
+        }
+
         List<Invoker<T>> invokers = list(invocation);
         if (invokers != null && !invokers.isEmpty()) {
             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
index 4cbaa02..28b4471 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
  * Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
  *
  * <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
- *
  */
 public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
 
@@ -57,50 +56,55 @@ public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
     @Override
     @SuppressWarnings({"unchecked", "rawtypes"})
     public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
-        checkInvokers(invokers, invocation);
-        final List<Invoker<T>> selected;
-        final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
-        final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
-        if (forks <= 0 || forks >= invokers.size()) {
-            selected = invokers;
-        } else {
-            selected = new ArrayList<Invoker<T>>();
-            for (int i = 0; i < forks; i++) {
-                // TODO. Add some comment here, refer chinese version for more details.
-                Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
-                if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
-                    selected.add(invoker);
+        try {
+            checkInvokers(invokers, invocation);
+            final List<Invoker<T>> selected;
+            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
+            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
+            if (forks <= 0 || forks >= invokers.size()) {
+                selected = invokers;
+            } else {
+                selected = new ArrayList<Invoker<T>>();
+                for (int i = 0; i < forks; i++) {
+                    // TODO. Add some comment here, refer chinese version for more details.
+                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
+                    if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
+                        selected.add(invoker);
+                    }
                 }
             }
-        }
-        RpcContext.getContext().setInvokers((List) selected);
-        final AtomicInteger count = new AtomicInteger();
-        final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
-        for (final Invoker<T> invoker : selected) {
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        Result result = invoker.invoke(invocation);
-                        ref.offer(result);
-                    } catch (Throwable e) {
-                        int value = count.incrementAndGet();
-                        if (value >= selected.size()) {
-                            ref.offer(e);
+            RpcContext.getContext().setInvokers((List) selected);
+            final AtomicInteger count = new AtomicInteger();
+            final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
+            for (final Invoker<T> invoker : selected) {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Result result = invoker.invoke(invocation);
+                            ref.offer(result);
+                        } catch (Throwable e) {
+                            int value = count.incrementAndGet();
+                            if (value >= selected.size()) {
+                                ref.offer(e);
+                            }
                         }
                     }
+                });
+            }
+            try {
+                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
+                if (ret instanceof Throwable) {
+                    Throwable e = (Throwable) ret;
+                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                 }
-            });
-        }
-        try {
-            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
-            if (ret instanceof Throwable) {
-                Throwable e = (Throwable) ret;
-                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
+                return (Result) ret;
+            } catch (InterruptedException e) {
+                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
             }
-            return (Result) ret;
-        } catch (InterruptedException e) {
-            throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
+        } finally {
+            // clear attachments which is binding to current thread.
+            RpcContext.getContext().clearAttachments();
         }
     }
 }
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
index 37f2f27..c249131 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.cluster.Directory;
@@ -132,6 +133,32 @@ public class AbstractClusterInvokerTest {
 
     }
 
+
+    @Test
+    public void testBindingAttachment() {
+        final String attachKey = "attach";
+        final String attachValue = "value";
+
+        // setup attachment
+        RpcContext.getContext().setAttachment(attachKey, attachValue);
+        Map<String, String> attachments = RpcContext.getContext().getAttachments();
+        Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
+
+        cluster = new AbstractClusterInvoker(dic) {
+            @Override
+            protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
+                    throws RpcException {
+                // attachment will be bind to invocation
+                String value = invocation.getAttachment(attachKey);
+                Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
+                return null;
+            }
+        };
+
+        // invoke
+        cluster.invoke(invocation);
+    }
+
     @Test
     public void testSelect_Invokersize0() throws Exception {
         {
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
index 28ba182..136ba2a 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
@@ -166,7 +166,7 @@ public class FailoverClusterInvokerTest {
      * then we should reselect from the latest invokers before retry.
      */
     @Test
-    public void testInvokerDestoryAndReList() {
+    public void testInvokerDestroyAndReList() {
         final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
         RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
         MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
index 97dcbcd..b39a7ba 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.RpcResult;
+import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.cluster.Directory;
 
 import org.junit.Assert;
@@ -30,6 +31,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertFalse;
 import static org.mockito.BDDMockito.given;
@@ -37,7 +39,6 @@ import static org.mockito.Mockito.mock;
 
 /**
  * ForkingClusterInvokerTest
- *
  */
 @SuppressWarnings("unchecked")
 public class ForkingClusterInvokerTest {
@@ -71,6 +72,7 @@ public class ForkingClusterInvokerTest {
         invokers.add(invoker3);
 
     }
+
     private void resetInvokerToException() {
         given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
         given(invoker1.getUrl()).willReturn(url);
@@ -106,7 +108,7 @@ public class ForkingClusterInvokerTest {
     }
 
     @Test
-    public void testInvokeExceptoin() {
+    public void testInvokeException() {
         resetInvokerToException();
         ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
                 dic);
@@ -120,8 +122,32 @@ public class ForkingClusterInvokerTest {
         }
     }
 
+    @Test
+    public void testClearRpcContext() {
+        resetInvokerToException();
+        ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
+                dic);
+
+        String attachKey = "attach";
+        String attachValue = "value";
+
+        RpcContext.getContext().setAttachment(attachKey, attachValue);
+
+        Map<String, String> attachments = RpcContext.getContext().getAttachments();
+        Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
+        try {
+            invoker.invoke(invocation);
+            Assert.fail();
+        } catch (RpcException expected) {
+            Assert.assertTrue("Successed to forking invoke provider !", expected.getMessage().contains("Failed to forking invoke provider"));
+            assertFalse(expected.getCause() instanceof RpcException);
+        }
+        Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
+        Assert.assertTrue("clear attachment failed!", afterInvoke != null && afterInvoke.size() == 0);
+    }
+
     @Test()
-    public void testInvokeNoExceptoin() {
+    public void testInvokeNoException() {
 
         resetInvokerToNoException();
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index a00a993..023635c 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -135,7 +135,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
             invocation.addAttachmentsIfAbsent(attachment);
         }
         Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
-        if (contextAttachments != null) {
+        if (contextAttachments != null && contextAttachments.size() != 0) {
             /**
              * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
              * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered