You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/07/31 05:13:04 UTC
[incubator-dubbo] branch 2.6.x updated: Merge pull request #2024,
binding attachment before a clusterInvoker invoke.
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 2.6.x
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/2.6.x by this push:
new 0278a01 Merge pull request #2024, binding attachment before a clusterInvoker invoke.
0278a01 is described below
commit 0278a0145fddfffcdf1c35e1812fc90a1573bc7f
Author: 时无两丶 <44...@qq.com>
AuthorDate: Tue Jul 31 13:12:56 2018 +0800
Merge pull request #2024, binding attachment before a clusterInvoker invoke.
Fixes #1978
---
.../cluster/support/AbstractClusterInvoker.java | 10 +++
.../rpc/cluster/support/ForkingClusterInvoker.java | 81 ++++++++++++----------
.../support/AbstractClusterInvokerTest.java | 26 +++++++
.../support/FailoverClusterInvokerTest.java | 2 +-
.../cluster/support/ForkingClusterInvokerTest.java | 41 +++++++++--
.../dubbo/rpc/protocol/AbstractInvoker.java | 2 +-
6 files changed, 116 insertions(+), 46 deletions(-)
diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index 9bd3497..d1b58ac 100644
--- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -25,6 +25,8 @@ import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
+import com.alibaba.dubbo.rpc.RpcContext;
+import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.cluster.Directory;
@@ -33,6 +35,7 @@ import com.alibaba.dubbo.rpc.support.RpcUtils;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -225,6 +228,13 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
+
+ // binding attachments into invocation.
+ Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
+ if (contextAttachments != null && contextAttachments.size() != 0) {
+ ((RpcInvocation) invocation).addAttachments(contextAttachments);
+ }
+
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
index 7d9a58d..ae652f7 100644
--- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
@@ -57,50 +57,55 @@ public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
- checkInvokers(invokers, invocation);
- final List<Invoker<T>> selected;
- final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
- final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
- if (forks <= 0 || forks >= invokers.size()) {
- selected = invokers;
- } else {
- selected = new ArrayList<Invoker<T>>();
- for (int i = 0; i < forks; i++) {
- // TODO. Add some comment here, refer chinese version for more details.
- Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
- if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
- selected.add(invoker);
+ try {
+ checkInvokers(invokers, invocation);
+ final List<Invoker<T>> selected;
+ final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
+ final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
+ if (forks <= 0 || forks >= invokers.size()) {
+ selected = invokers;
+ } else {
+ selected = new ArrayList<Invoker<T>>();
+ for (int i = 0; i < forks; i++) {
+ // TODO. Add some comment here, refer chinese version for more details.
+ Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
+ if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
+ selected.add(invoker);
+ }
}
}
- }
- RpcContext.getContext().setInvokers((List) selected);
- final AtomicInteger count = new AtomicInteger();
- final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
- for (final Invoker<T> invoker : selected) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Result result = invoker.invoke(invocation);
- ref.offer(result);
- } catch (Throwable e) {
- int value = count.incrementAndGet();
- if (value >= selected.size()) {
- ref.offer(e);
+ RpcContext.getContext().setInvokers((List) selected);
+ final AtomicInteger count = new AtomicInteger();
+ final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
+ for (final Invoker<T> invoker : selected) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Result result = invoker.invoke(invocation);
+ ref.offer(result);
+ } catch (Throwable e) {
+ int value = count.incrementAndGet();
+ if (value >= selected.size()) {
+ ref.offer(e);
+ }
}
}
+ });
+ }
+ try {
+ Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
+ if (ret instanceof Throwable) {
+ Throwable e = (Throwable) ret;
+ throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
- });
- }
- try {
- Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
- if (ret instanceof Throwable) {
- Throwable e = (Throwable) ret;
- throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
+ return (Result) ret;
+ } catch (InterruptedException e) {
+ throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
- return (Result) ret;
- } catch (InterruptedException e) {
- throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
+ } finally {
+ // clear attachments which is binding to current thread.
+ RpcContext.getContext().clearAttachments();
}
}
}
diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
index 611683c..cac83e9 100644
--- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
@@ -23,6 +23,7 @@ import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
+import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.cluster.Directory;
@@ -133,6 +134,31 @@ public class AbstractClusterInvokerTest {
}
@Test
+ public void testBindingAttachment() {
+ final String attachKey = "attach";
+ final String attachValue = "value";
+
+ // setup attachment
+ RpcContext.getContext().setAttachment(attachKey, attachValue);
+ Map<String, String> attachments = RpcContext.getContext().getAttachments();
+ Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
+
+ cluster = new AbstractClusterInvoker(dic) {
+ @Override
+ protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
+ throws RpcException {
+ // attachment will be bind to invocation
+ String value = invocation.getAttachment(attachKey);
+ Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
+ return null;
+ }
+ };
+
+ // invoke
+ cluster.invoke(invocation);
+ }
+
+ @Test
public void testSelect_Invokersize0() throws Exception {
{
Invoker invoker = cluster.select(null, null, null, null);
diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
index f41864f..00a1de4 100644
--- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
@@ -166,7 +166,7 @@ public class FailoverClusterInvokerTest {
* then we should reselect from the latest invokers before retry.
*/
@Test
- public void testInvokerDestoryAndReList() {
+ public void testInvokerDestroyAndReList() {
final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
index 5a6886c..066c01a 100644
--- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
@@ -19,25 +19,29 @@ package com.alibaba.dubbo.rpc.cluster.support;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
+import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.cluster.Directory;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* ForkingClusterInvokerTest
- *
*/
@SuppressWarnings("unchecked")
public class ForkingClusterInvokerTest {
@@ -71,6 +75,7 @@ public class ForkingClusterInvokerTest {
invokers.add(invoker3);
}
+
private void resetInvokerToException() {
given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
given(invoker1.getUrl()).willReturn(url);
@@ -106,7 +111,7 @@ public class ForkingClusterInvokerTest {
}
@Test
- public void testInvokeExceptoin() {
+ public void testInvokeException() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
@@ -115,20 +120,44 @@ public class ForkingClusterInvokerTest {
invoker.invoke(invocation);
Assert.fail();
} catch (RpcException expected) {
- Assert.assertTrue(expected.getMessage().contains("Failed to forking invoke provider"));
+ assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
+ assertFalse(expected.getCause() instanceof RpcException);
+ }
+ }
+
+ @Test
+ public void testClearRpcContext() {
+ resetInvokerToException();
+ ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
+ dic);
+
+ String attachKey = "attach";
+ String attachValue = "value";
+
+ RpcContext.getContext().setAttachment(attachKey, attachValue);
+
+ Map<String, String> attachments = RpcContext.getContext().getAttachments();
+ assertThat("set attachment failed!", attachments != null && attachments.size() == 1, is(true));
+ try {
+ invoker.invoke(invocation);
+ Assert.fail();
+ } catch (RpcException expected) {
+ assertThat(expected.getMessage().contains("Failed to forking invoke provider"), is(true));
assertFalse(expected.getCause() instanceof RpcException);
}
+ Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
+ assertThat(afterInvoke != null && afterInvoke.size() == 0, is(true));
}
@Test()
- public void testInvokeNoExceptoin() {
+ public void testInvokeNoException() {
resetInvokerToNoException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
Result ret = invoker.invoke(invocation);
- Assert.assertSame(result, ret);
+ assertSame(result, ret);
}
}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
index dd5c158..2e4980b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/protocol/AbstractInvoker.java
@@ -135,7 +135,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
- if (contextAttachments != null) {
+ if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered