You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/07/31 05:13:04 UTC

[incubator-dubbo] branch 2.6.x updated: Merge pull request #2024, binding attachment before a clusterInvoker invoke.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/2.6.x by this push:
     new 0278a01  Merge pull request #2024, binding attachment before a clusterInvoker invoke.
0278a01 is described below

commit 0278a0145fddfffcdf1c35e1812fc90a1573bc7f
Author: 时无两丶 <44...@qq.com>
AuthorDate: Tue Jul 31 13:12:56 2018 +0800

    Merge pull request #2024, binding attachment before a clusterInvoker invoke.
    
    Fixes #1978
---
 .../cluster/support/AbstractClusterInvoker.java    | 10 +++
 .../rpc/cluster/support/ForkingClusterInvoker.java | 81 ++++++++++++----------
 .../support/AbstractClusterInvokerTest.java        | 26 +++++++
 .../support/FailoverClusterInvokerTest.java        |  2 +-
 .../cluster/support/ForkingClusterInvokerTest.java | 41 +++++++++--
 .../dubbo/rpc/protocol/AbstractInvoker.java        |  2 +-
 6 files changed, 116 insertions(+), 46 deletions(-)

diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index 9bd3497..d1b58ac 100644
--- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -25,6 +25,8 @@ import com.alibaba.dubbo.common.logger.LoggerFactory;
 import com.alibaba.dubbo.common.utils.NetUtils;
 import com.alibaba.dubbo.rpc.Invocation;
 import com.alibaba.dubbo.rpc.Invoker;
+import com.alibaba.dubbo.rpc.RpcContext;
+import com.alibaba.dubbo.rpc.RpcInvocation;
 import com.alibaba.dubbo.rpc.Result;
 import com.alibaba.dubbo.rpc.RpcException;
 import com.alibaba.dubbo.rpc.cluster.Directory;
@@ -33,6 +35,7 @@ import com.alibaba.dubbo.rpc.support.RpcUtils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -225,6 +228,13 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
     public Result invoke(final Invocation invocation) throws RpcException {
         checkWhetherDestroyed();
         LoadBalance loadbalance = null;
+
+        // binding attachments into invocation.
+        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
+        if (contextAttachments != null && contextAttachments.size() != 0) {
+            ((RpcInvocation) invocation).addAttachments(contextAttachments);
+        }
+
         List<Invoker<T>> invokers = list(invocation);
         if (invokers != null && !invokers.isEmpty()) {
             loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
index 7d9a58d..ae652f7 100644
--- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
@@ -57,50 +57,55 @@ public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
     @Override
     @SuppressWarnings({"unchecked", "rawtypes"})
     public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
-        checkInvokers(invokers, invocation);
-        final List<Invoker<T>> selected;
-        final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
-        final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
-        if (forks <= 0 || forks >= invokers.size()) {
-            selected = invokers;
-        } else {
-            selected = new ArrayList<Invoker<T>>();
-            for (int i = 0; i < forks; i++) {
-                // TODO. Add some comment here, refer chinese version for more details.
-                Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
-                if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
-                    selected.add(invoker);
+        try {
+            checkInvokers(invokers, invocation);
+            final List<Invoker<T>> selected;
+            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
+            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
+            if (forks <= 0 || forks >= invokers.size()) {
+                selected = invokers;
+            } else {
+                selected = new ArrayList<Invoker<T>>();
+                for (int i = 0; i < forks; i++) {
+                    // TODO. Add some comment here, refer chinese version for more details.
+                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
+                    if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
+                        selected.add(invoker);
+                    }
                 }
             }
-        }
-        RpcContext.getContext().setInvokers((List) selected);
-        final AtomicInteger count = new AtomicInteger();
-        final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
-        for (final Invoker<T> invoker : selected) {
-            executor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        Result result = invoker.invoke(invocation);
-                        ref.offer(result);
-                    } catch (Throwable e) {
-                        int value = count.incrementAndGet();
-                        if (value >= selected.size()) {
-                            ref.offer(e);
+            RpcContext.getContext().setInvokers((List) selected);
+            final AtomicInteger count = new AtomicInteger();
+            final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
+            for (final Invoker<T> invoker : selected) {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Result result = invoker.invoke(invocation);
+                            ref.offer(result);
+                        } catch (Throwable e) {
+                            int value = count.incrementAndGet();
+                            if (value >= selected.size()) {
+                                ref.offer(e);
+                            }
                         }
                     }
+                });
+            }
+            try {
+                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
+                if (ret instanceof Throwable) {
+                    Throwable e = (Throwable) ret;
+                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                 }
-            });
-        }
-        try {
-            Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
-            if (ret instanceof Throwable) {
-                Throwable e = (Throwable) ret;
-                throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
+                return (Result) ret;
+            } catch (InterruptedException e) {
+                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
             }
-            return (Result) ret;
-        } catch (InterruptedException e) {
-            throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
+        } finally {
+            // clear attachments which is binding to current thread.
+            RpcContext.getContext().clearAttachments();
         }
     }
 }
diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
index 611683c..cac83e9 100644
--- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
@@ -23,6 +23,7 @@ import com.alibaba.dubbo.common.utils.NetUtils;
 import com.alibaba.dubbo.rpc.Invocation;
 import com.alibaba.dubbo.rpc.Invoker;
 import com.alibaba.dubbo.rpc.Result;
+import com.alibaba.dubbo.rpc.RpcContext;
 import com.alibaba.dubbo.rpc.RpcException;
 import com.alibaba.dubbo.rpc.RpcInvocation;
 import com.alibaba.dubbo.rpc.cluster.Directory;
@@ -133,6 +134,31 @@ public class AbstractClusterInvokerTest {
     }
 
     @Test
+    public void testBindingAttachment() {
+        final String attachKey = "attach";
+        final String attachValue = "value";
+
+        // setup attachment
+        RpcContext.getContext().setAttachment(attachKey, attachValue);
+        Map<String, String> attachments = RpcContext.getContext().getAttachments();
+        Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
+
+        cluster = new AbstractClusterInvoker(dic) {
+            @Override
+            protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
+                    throws RpcException {
+                // attachment will be bind to invocation
+                String value = invocation.getAttachment(attachKey);
+                Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
+                return null;
+            }
+        };
+
+        // invoke
+        cluster.invoke(invocation);
+    }
+
+    @Test
     public void testSelect_Invokersize0() throws Exception {
         {
             Invoker invoker = cluster.select(null, null, null, null);
diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
index f41864f..00a1de4 100644
--- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
@@ -166,7 +166,7 @@ public class FailoverClusterInvokerTest {
      * then we should reselect from the latest invokers before retry.
      */
     @Test
-    public void testInvokerDestoryAndReList() {
+    public void testInvokerDestroyAndReList() {
         final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
         RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
         MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
index 5a6886c..066c01a 100644
--- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
@@ -19,25 +19,29 @@ package com.alibaba.dubbo.rpc.cluster.support;
 import com.alibaba.dubbo.common.URL;
 import com.alibaba.dubbo.rpc.Invoker;
 import com.alibaba.dubbo.rpc.Result;
+import com.alibaba.dubbo.rpc.RpcContext;
 import com.alibaba.dubbo.rpc.RpcException;
 import com.alibaba.dubbo.rpc.RpcInvocation;
 import com.alibaba.dubbo.rpc.RpcResult;
 import com.alibaba.dubbo.rpc.cluster.Directory;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 
 /**
  * ForkingClusterInvokerTest
- *
  */
 @SuppressWarnings("unchecked")
 public class ForkingClusterInvokerTest {
@@ -71,6 +75,7 @@ public class ForkingClusterInvokerTest {
         invokers.add(invoker3);
 
     }
+
     private void resetInvokerToException() {
         given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
         given(invoker1.getUrl()).willReturn(url);
@@ -106,7 +111,7 @@ public class ForkingClusterInvokerTest {
     }
 
     @Test
-    public void testInvokeExceptoin() {
+    public void testInvokeException() {
         resetInvokerToException();
         ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
                 dic);
@@ -115,20 +120,44 @@ public class ForkingClusterInvokerTest {
             invoker.invoke(invocation);
             Assert.fail();
         } catch (RpcException expected) {
-            Assert.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));
+            assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
+            assertFalse(expected.getCause() instanceof RpcException);
+        }
+    }
+
+    @Test
+    public void testClearRpcContext() {
+        resetInvokerToException();
+        ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
+                dic);
+
+        String attachKey = "attach";
+        String attachValue = "value";
+
+        RpcContext.getContext().setAttachment(attachKey, attachValue);
+
+        Map<String, String> attachments = RpcContext.getContext().getAttachments();
+        assertThat("set attachment failed!", attachments != null && attachments.size() == 1, is(true));
+        try {
+            invoker.invoke(invocation);
+            Assert.fail();
+        } catch (RpcException expected) {
+            assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
             assertFalse(expected.getCause() instanceof RpcException);
         }
+        Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
+        assertThat(afterInvoke != null && afterInvoke.size() == 0, is(true));
     }
 
     @Test()
-    public void testInvokeNoExceptoin() {
+    public void testInvokeNoException() {
 
         resetInvokerToNoException();
 
         ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
                 dic);
         Result ret = invoker.invoke(invocation);
-        Assert.assertSame(result, ret);
+        assertSame(result, ret);
     }
 
 }
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
index dd5c158..2e4980b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
@@ -135,7 +135,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
             invocation.addAttachmentsIfAbsent(attachment);
         }
         Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
-        if (contextAttachments != null) {
+        if (contextAttachments != null && contextAttachments.size() != 0) {
             /**
              * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
              * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered