You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/12/26 09:26:52 UTC
[dubbo] branch 3.0.5-release updated: try moving finally block (#9496)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0.5-release by this push:
new 2759f38 try moving finally block (#9496)
2759f38 is described below
commit 2759f386b1c91f284d2afbb478b6a1943885ce65
Author: ken.lj <ke...@gmail.com>
AuthorDate: Sun Dec 26 17:26:13 2021 +0800
try moving finally block (#9496)
---
.../filter/support/ConsumerContextFilter.java | 37 ++++++++++--------
.../dubbo/common/constants/CommonConstants.java | 2 +
.../org/apache/dubbo/rpc/AsyncContextImpl.java | 2 +-
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 17 +++++----
.../main/java/org/apache/dubbo/rpc/RpcContext.java | 44 +++++++++-------------
.../org/apache/dubbo/rpc/RpcServiceContext.java | 13 -------
.../org/apache/dubbo/rpc/filter/ContextFilter.java | 24 ++++++------
.../dubbo/rpc/proxy/AbstractProxyInvoker.java | 8 +++-
.../apache/dubbo/rpc/filter/ContextFilterTest.java | 6 +--
9 files changed, 72 insertions(+), 81 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
index 846a740..79eb7b8 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
@@ -93,35 +93,40 @@ public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Liste
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
- try {
- // pass default timeout set by end user (ReferenceConfig)
- Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
- if (countDown != null) {
- TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
- if (timeoutCountDown.isExpired()) {
- return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
- "No time left for making the following call: " + invocation.getServiceName() + "."
- + invocation.getMethodName() + ", terminate directly."), invocation);
- }
+ // pass default timeout set by end user (ReferenceConfig)
+ Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
+ if (countDown != null) {
+ TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
+ if (timeoutCountDown.isExpired()) {
+ return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
+ "No time left for making the following call: " + invocation.getServiceName() + "."
+ + invocation.getMethodName() + ", terminate directly."), invocation);
}
-
- RpcContext.removeServerContext();
- return invoker.invoke(invocation);
- } finally {
- RpcContext.removeServiceContext();
- RpcContext.removeClientAttachment();
}
+
+ RpcContext.removeServerContext();
+ return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
+
+ removeContext();
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ removeContext();
+ }
+ private void removeContext() {
+ RpcContext.removeServiceContext();
+ RpcContext.removeClientAttachment();
+ // server context must not be removed because user might use it on callback.
+ // So the clear of is delayed til the start of the next rpc call, see RpcContext.removeServerContext(); in invoke() above
+ // RpcContext.removeServerContext();
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 7b9de27..be5e20d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -508,4 +508,6 @@ public interface CommonConstants {
String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER";
+ String PROVIDER_ASYNC_KEY = "PROVIDER_ASYNC";
+
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
index 9f9a083..4ff3368 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
@@ -31,7 +31,7 @@ public class AsyncContextImpl implements AsyncContext {
private ClassLoader stagedClassLoader;
public AsyncContextImpl() {
- restoreContext = RpcContext.storeContext(false);
+ restoreContext = RpcContext.storeContext();
restoreClassLoader = Thread.currentThread().getContextClassLoader();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 80a46da..377432a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;
import static org.apache.dubbo.common.utils.ReflectUtils.defaultReturn;
/**
@@ -59,6 +60,7 @@ public class AsyncRpcResult implements Result {
private Executor executor;
private Invocation invocation;
+ private final boolean async;
private CompletableFuture<AppResponse> responseFuture;
@@ -66,10 +68,11 @@ public class AsyncRpcResult implements Result {
this.responseFuture = future;
this.invocation = invocation;
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
- if (InvokeMode.SYNC != rpcInvocation.getInvokeMode() && !future.isDone()) {
- this.storedContext = RpcContext.storeContext(false);
+ if ((rpcInvocation.get(PROVIDER_ASYNC_KEY) != null || InvokeMode.SYNC != rpcInvocation.getInvokeMode()) && !future.isDone()) {
+ async = true;
+ this.storedContext = RpcContext.clearAndStoreContext();
} else {
- this.storedContext = RpcContext.storeContext(true);
+ async = false;
}
}
@@ -198,12 +201,10 @@ public class AsyncRpcResult implements Result {
public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
- RpcContext.RestoreContext tmpContext = RpcContext.storeContext(false);
- RpcContext.restoreContext(storedContext);
-
+ if (async) {
+ RpcContext.restoreContext(storedContext);
+ }
fn.accept(v, t);
-
- RpcContext.restoreContext(tmpContext);
});
// Necessary! update future in context, see https://github.com/apache/dubbo/issues/9461
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 1db82f1..69e986e 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -803,8 +803,14 @@ public class RpcContext {
RpcServiceContext.setRpcContext(url);
}
- protected static RestoreContext storeContext(boolean needCopy) {
- return new RestoreContext(needCopy);
+ protected static RestoreContext clearAndStoreContext() {
+ RestoreContext restoreContext = new RestoreContext();
+ RpcContext.removeContext();
+ return restoreContext;
+ }
+
+ protected static RestoreContext storeContext() {
+ return new RestoreContext();
}
protected static void restoreContext(RestoreContext restoreContext) {
@@ -813,22 +819,6 @@ public class RpcContext {
}
}
- protected static void restoreClientAttachment(RpcContextAttachment oldContext) {
- CLIENT_ATTACHMENT.set(oldContext);
- }
-
- protected static void restoreServerContext(RpcContextAttachment oldServerContext) {
- SERVER_LOCAL.set(oldServerContext);
- }
-
- protected static void restoreServerAttachment(RpcContextAttachment oldServerContext) {
- SERVER_ATTACHMENT.set(oldServerContext);
- }
-
- protected static void restoreServiceContext(RpcServiceContext oldServiceContext) {
- SERVICE_CONTEXT.set(oldServiceContext);
- }
-
/**
* Used to temporarily store and restore all kinds of contexts of current thread.
*/
@@ -838,31 +828,31 @@ public class RpcContext {
private final RpcContextAttachment serverAttachment;
private final RpcContextAttachment serverLocal;
- public RestoreContext(boolean needCopy) {
- serviceContext = getServiceContext().copyOf(needCopy);
- clientAttachment = getClientAttachment().copyOf(needCopy);
- serverAttachment = getServerAttachment().copyOf(needCopy);
- serverLocal = getServerContext().copyOf(needCopy);
+ public RestoreContext() {
+ serviceContext = getServiceContext().copyOf(false);
+ clientAttachment = getClientAttachment().copyOf(false);
+ serverAttachment = getServerAttachment().copyOf(false);
+ serverLocal = getServerContext().copyOf(false);
}
public void restore() {
if (serviceContext != null) {
- restoreServiceContext(serviceContext);
+ SERVICE_CONTEXT.set(serviceContext);
} else {
removeServiceContext();
}
if (clientAttachment != null) {
- restoreClientAttachment(clientAttachment);
+ CLIENT_ATTACHMENT.set(clientAttachment);
} else {
removeClientAttachment();
}
if (serverAttachment != null) {
- restoreServerAttachment(serverAttachment);
+ SERVER_ATTACHMENT.set(serverAttachment);
} else {
removeServerAttachment();
}
if (serverLocal != null) {
- restoreServerContext(serverLocal);
+ SERVER_LOCAL.set(serverLocal);
} else {
removeServerContext();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
index e61632d..e59ae9b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
@@ -658,10 +658,6 @@ public class RpcServiceContext extends RpcContext {
* @return a shallow copy of RpcServiceContext
*/
public RpcServiceContext copyOf(boolean needCopy) {
- if (!isValid()) {
- return this;
- }
-
if (needCopy) {
RpcServiceContext copy = new RpcServiceContext();
copy.consumerUrl = this.consumerUrl;
@@ -675,13 +671,4 @@ public class RpcServiceContext extends RpcContext {
}
}
-
- private boolean isValid() {
- return this.consumerUrl != null
- || this.localAddress != null
- || this.remoteAddress != null
- || this.invocation != null
- || this.asyncContext != null;
- }
-
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index 3e38c67..3d6aabe 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -126,27 +126,29 @@ public class ContextFilter implements Filter, Filter.Listener {
((RpcInvocation) invocation).setInvoker(invoker);
}
- try {
- context.clearAfterEachInvoke(false);
- return invoker.invoke(invocation);
- } finally {
- context.clearAfterEachInvoke(true);
- RpcContext.removeServerAttachment();
- RpcContext.removeServiceContext();
- // IMPORTANT! For async scenario, context must be removed from current thread, so a new RpcContext is always created for the next invoke for the same thread.
- RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
- RpcContext.removeServerContext();
- }
+ context.clearAfterEachInvoke(false);
+
+ return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
+ removeContext();
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ removeContext();
+ }
+
+ private void removeContext() {
+ RpcContext.getServerAttachment().clearAfterEachInvoke(true); // TODO, not necessary anymore
+ RpcContext.removeServerAttachment();
+ RpcContext.removeClientAttachment();
+ RpcContext.removeServiceContext();
+ RpcContext.removeServerContext();
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 336671a..84dfd33 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -32,6 +32,8 @@ import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;
+
/**
* This Invoker works on provider side, delegates RPC to interface implementation.
*/
@@ -82,7 +84,7 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
public Result invoke(Invocation invocation) throws RpcException {
try {
Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
- CompletableFuture<Object> future = wrapWithFuture(value);
+ CompletableFuture<Object> future = wrapWithFuture(value, invocation);
CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
AppResponse result = new AppResponse(invocation);
if (t != null) {
@@ -107,10 +109,12 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
}
}
- private CompletableFuture<Object> wrapWithFuture(Object value) {
+ private CompletableFuture<Object> wrapWithFuture(Object value, Invocation invocation) {
if (value instanceof CompletableFuture) {
+ invocation.put(PROVIDER_ASYNC_KEY, Boolean.TRUE);
return (CompletableFuture<Object>) value;
} else if (RpcContext.getServiceContext().isAsyncStarted()) {
+ invocation.put(PROVIDER_ASYNC_KEY, Boolean.TRUE);
return ((AsyncContextImpl) (RpcContext.getServiceContext().getAsyncContext())).getInternalFuture();
}
return CompletableFuture.completedFuture(value);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java
index 84b6a68..a37f87a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java
@@ -29,7 +29,7 @@ import org.apache.dubbo.rpc.support.MyInvoker;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
@@ -62,7 +62,7 @@ public class ContextFilterTest {
given(invoker.getUrl()).willReturn(url);
contextFilter.invoke(invoker, invocation);
- assertNull(RpcContext.getServiceContext().getInvoker());
+ assertNotNull(RpcContext.getServiceContext().getInvoker());
}
@Test
@@ -71,6 +71,6 @@ public class ContextFilterTest {
Invoker<DemoService> invoker = new MyInvoker<DemoService>(url);
Invocation invocation = new MockInvocation();
Result result = contextFilter.invoke(invoker, invocation);
- assertNull(RpcContext.getServiceContext().getInvoker());
+ assertNotNull(RpcContext.getServiceContext().getInvoker());
}
}