You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/07/31 05:13:53 UTC
[incubator-dubbo] branch master updated: Merge pull request #2125,
binding attachments into invocation so that a rpc invoke have a
same invocation.
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 971d80d Merge pull request #2125, binding attachments into invocation so that a rpc invoke have a same invocation.
971d80d is described below
commit 971d80d55da692690135dead98f9ce2001023e60
Author: 时无两丶 <44...@qq.com>
AuthorDate: Tue Jul 31 13:13:51 2018 +0800
Merge pull request #2125, binding attachments into invocation so that a rpc invoke have a same invocation.
Fixes #1978
---
.../cluster/support/AbstractClusterInvoker.java | 15 +++-
.../rpc/cluster/support/ForkingClusterInvoker.java | 82 ++++++++++++----------
.../support/AbstractClusterInvokerTest.java | 27 +++++++
.../support/FailoverClusterInvokerTest.java | 2 +-
.../cluster/support/ForkingClusterInvokerTest.java | 32 ++++++++-
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 2 +-
6 files changed, 113 insertions(+), 47 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index a4a1905..2e3ab14 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -25,6 +25,8 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
+import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Directory;
@@ -33,11 +35,11 @@ import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* AbstractClusterInvoker
- *
*/
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
@@ -92,10 +94,10 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
/**
* Select a invoker using loadbalance policy.</br>
- * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
+ * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
* if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
* b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that
- * the selected invoker has the minimum chance to be one in the previously selected list, and also
+ * the selected invoker has the minimum chance to be one in the previously selected list, and also
* guarantees this invoker is available.
*
* @param loadbalance load balance policy
@@ -225,6 +227,13 @@ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
+
+ // binding attachments into invocation.
+ Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
+ if (contextAttachments != null && contextAttachments.size() != 0) {
+ ((RpcInvocation) invocation).addAttachments(contextAttachments);
+ }
+
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
index 4cbaa02..28b4471 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
- *
*/
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
@@ -57,50 +56,55 @@ public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
- checkInvokers(invokers, invocation);
- final List<Invoker<T>> selected;
- final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
- final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
- if (forks <= 0 || forks >= invokers.size()) {
- selected = invokers;
- } else {
- selected = new ArrayList<Invoker<T>>();
- for (int i = 0; i < forks; i++) {
- // TODO. Add some comment here, refer chinese version for more details.
- Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
- if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
- selected.add(invoker);
+ try {
+ checkInvokers(invokers, invocation);
+ final List<Invoker<T>> selected;
+ final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
+ final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
+ if (forks <= 0 || forks >= invokers.size()) {
+ selected = invokers;
+ } else {
+ selected = new ArrayList<Invoker<T>>();
+ for (int i = 0; i < forks; i++) {
+ // TODO. Add some comment here, refer chinese version for more details.
+ Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
+ if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
+ selected.add(invoker);
+ }
}
}
- }
- RpcContext.getContext().setInvokers((List) selected);
- final AtomicInteger count = new AtomicInteger();
- final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
- for (final Invoker<T> invoker : selected) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- Result result = invoker.invoke(invocation);
- ref.offer(result);
- } catch (Throwable e) {
- int value = count.incrementAndGet();
- if (value >= selected.size()) {
- ref.offer(e);
+ RpcContext.getContext().setInvokers((List) selected);
+ final AtomicInteger count = new AtomicInteger();
+ final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
+ for (final Invoker<T> invoker : selected) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Result result = invoker.invoke(invocation);
+ ref.offer(result);
+ } catch (Throwable e) {
+ int value = count.incrementAndGet();
+ if (value >= selected.size()) {
+ ref.offer(e);
+ }
}
}
+ });
+ }
+ try {
+ Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
+ if (ret instanceof Throwable) {
+ Throwable e = (Throwable) ret;
+ throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
- });
- }
- try {
- Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
- if (ret instanceof Throwable) {
- Throwable e = (Throwable) ret;
- throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
+ return (Result) ret;
+ } catch (InterruptedException e) {
+ throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
- return (Result) ret;
- } catch (InterruptedException e) {
- throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
+ } finally {
+ // clear attachments which is binding to current thread.
+ RpcContext.getContext().clearAttachments();
}
}
}
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
index 37f2f27..c249131 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvokerTest.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
@@ -132,6 +133,32 @@ public class AbstractClusterInvokerTest {
}
+
+ @Test
+ public void testBindingAttachment() {
+ final String attachKey = "attach";
+ final String attachValue = "value";
+
+ // setup attachment
+ RpcContext.getContext().setAttachment(attachKey, attachValue);
+ Map<String, String> attachments = RpcContext.getContext().getAttachments();
+ Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
+
+ cluster = new AbstractClusterInvoker(dic) {
+ @Override
+ protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance)
+ throws RpcException {
+ // attachment will be bind to invocation
+ String value = invocation.getAttachment(attachKey);
+ Assert.assertTrue("binding attachment failed!", value != null && value.equals(attachValue));
+ return null;
+ }
+ };
+
+ // invoke
+ cluster.invoke(invocation);
+ }
+
@Test
public void testSelect_Invokersize0() throws Exception {
{
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
index 28ba182..136ba2a 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
@@ -166,7 +166,7 @@ public class FailoverClusterInvokerTest {
* then we should reselect from the latest invokers before retry.
*/
@Test
- public void testInvokerDestoryAndReList() {
+ public void testInvokerDestroyAndReList() {
final URL url = URL.valueOf("test://localhost/" + Demo.class.getName() + "?loadbalance=roundrobin&retries=" + retries);
RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION);
MockInvoker<Demo> invoker1 = new MockInvoker<Demo>(Demo.class, url);
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
index 97dcbcd..b39a7ba 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;
import org.junit.Assert;
@@ -30,6 +31,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static org.junit.Assert.assertFalse;
import static org.mockito.BDDMockito.given;
@@ -37,7 +39,6 @@ import static org.mockito.Mockito.mock;
/**
* ForkingClusterInvokerTest
- *
*/
@SuppressWarnings("unchecked")
public class ForkingClusterInvokerTest {
@@ -71,6 +72,7 @@ public class ForkingClusterInvokerTest {
invokers.add(invoker3);
}
+
private void resetInvokerToException() {
given(invoker1.invoke(invocation)).willThrow(new RuntimeException());
given(invoker1.getUrl()).willReturn(url);
@@ -106,7 +108,7 @@ public class ForkingClusterInvokerTest {
}
@Test
- public void testInvokeExceptoin() {
+ public void testInvokeException() {
resetInvokerToException();
ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
dic);
@@ -120,8 +122,32 @@ public class ForkingClusterInvokerTest {
}
}
+ @Test
+ public void testClearRpcContext() {
+ resetInvokerToException();
+ ForkingClusterInvoker<ForkingClusterInvokerTest> invoker = new ForkingClusterInvoker<ForkingClusterInvokerTest>(
+ dic);
+
+ String attachKey = "attach";
+ String attachValue = "value";
+
+ RpcContext.getContext().setAttachment(attachKey, attachValue);
+
+ Map<String, String> attachments = RpcContext.getContext().getAttachments();
+ Assert.assertTrue("set attachment failed!", attachments != null && attachments.size() == 1);
+ try {
+ invoker.invoke(invocation);
+ Assert.fail();
+ } catch (RpcException expected) {
+ Assert.assertTrue("Successed to forking invoke provider !", expected.getMessage().contains("Failed to forking invoke provider"));
+ assertFalse(expected.getCause() instanceof RpcException);
+ }
+ Map<String, String> afterInvoke = RpcContext.getContext().getAttachments();
+ Assert.assertTrue("clear attachment failed!", afterInvoke != null && afterInvoke.size() == 0);
+ }
+
@Test()
- public void testInvokeNoExceptoin() {
+ public void testInvokeNoException() {
resetInvokerToNoException();
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index a00a993..023635c 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -135,7 +135,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
- if (contextAttachments != null) {
+ if (contextAttachments != null && contextAttachments.size() != 0) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered