You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/12/26 08:04:46 UTC
[dubbo] 01/01: try moving finally block
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch fix-3.0-context-clear
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit cd84e0bcef375ac15c56ba38084238bc551ac3ac
Author: ken.lj <ke...@gmail.com>
AuthorDate: Sun Dec 26 16:04:01 2021 +0800
try moving finally block
---
.../filter/support/ConsumerContextFilter.java | 37 ++++++++++--------
.../dubbo/common/constants/CommonConstants.java | 2 +
.../org/apache/dubbo/rpc/AsyncContextImpl.java | 2 +-
.../java/org/apache/dubbo/rpc/AsyncRpcResult.java | 17 +++++----
.../main/java/org/apache/dubbo/rpc/RpcContext.java | 44 +++++++++-------------
.../org/apache/dubbo/rpc/RpcServiceContext.java | 13 -------
.../org/apache/dubbo/rpc/filter/ContextFilter.java | 24 ++++++------
.../dubbo/rpc/proxy/AbstractProxyInvoker.java | 5 +++
8 files changed, 68 insertions(+), 76 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
index 846a740..79eb7b8 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
@@ -93,35 +93,40 @@ public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Liste
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
- try {
- // pass default timeout set by end user (ReferenceConfig)
- Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
- if (countDown != null) {
- TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
- if (timeoutCountDown.isExpired()) {
- return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
- "No time left for making the following call: " + invocation.getServiceName() + "."
- + invocation.getMethodName() + ", terminate directly."), invocation);
- }
+ // pass default timeout set by end user (ReferenceConfig)
+ Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
+ if (countDown != null) {
+ TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
+ if (timeoutCountDown.isExpired()) {
+ return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
+ "No time left for making the following call: " + invocation.getServiceName() + "."
+ + invocation.getMethodName() + ", terminate directly."), invocation);
}
-
- RpcContext.removeServerContext();
- return invoker.invoke(invocation);
- } finally {
- RpcContext.removeServiceContext();
- RpcContext.removeClientAttachment();
}
+
+ RpcContext.removeServerContext();
+ return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
+
+ removeContext();
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ removeContext();
+ }
+ private void removeContext() {
+ RpcContext.removeServiceContext();
+ RpcContext.removeClientAttachment();
+ // server context must not be removed because user might use it on callback.
+ // So the clear of is delayed til the start of the next rpc call, see RpcContext.removeServerContext(); in invoke() above
+ // RpcContext.removeServerContext();
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 7b9de27..be5e20d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -508,4 +508,6 @@ public interface CommonConstants {
String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER";
+ String PROVIDER_ASYNC_KEY = "PROVIDER_ASYNC";
+
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
index 9f9a083..4ff3368 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
@@ -31,7 +31,7 @@ public class AsyncContextImpl implements AsyncContext {
private ClassLoader stagedClassLoader;
public AsyncContextImpl() {
- restoreContext = RpcContext.storeContext(false);
+ restoreContext = RpcContext.storeContext();
restoreClassLoader = Thread.currentThread().getContextClassLoader();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 80a46da..de485b8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;
import static org.apache.dubbo.common.utils.ReflectUtils.defaultReturn;
/**
@@ -59,6 +60,7 @@ public class AsyncRpcResult implements Result {
private Executor executor;
private Invocation invocation;
+ private final boolean async;
private CompletableFuture<AppResponse> responseFuture;
@@ -66,10 +68,11 @@ public class AsyncRpcResult implements Result {
this.responseFuture = future;
this.invocation = invocation;
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
- if (InvokeMode.SYNC != rpcInvocation.getInvokeMode() && !future.isDone()) {
- this.storedContext = RpcContext.storeContext(false);
+ if (((Boolean) rpcInvocation.get(PROVIDER_ASYNC_KEY) || InvokeMode.SYNC != rpcInvocation.getInvokeMode()) && !future.isDone()) {
+ async = true;
+ this.storedContext = RpcContext.clearAndStoreContext();
} else {
- this.storedContext = RpcContext.storeContext(true);
+ async = false;
}
}
@@ -198,12 +201,10 @@ public class AsyncRpcResult implements Result {
public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
- RpcContext.RestoreContext tmpContext = RpcContext.storeContext(false);
- RpcContext.restoreContext(storedContext);
-
+ if (async) {
+ RpcContext.restoreContext(storedContext);
+ }
fn.accept(v, t);
-
- RpcContext.restoreContext(tmpContext);
});
// Necessary! update future in context, see https://github.com/apache/dubbo/issues/9461
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 1db82f1..69e986e 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -803,8 +803,14 @@ public class RpcContext {
RpcServiceContext.setRpcContext(url);
}
- protected static RestoreContext storeContext(boolean needCopy) {
- return new RestoreContext(needCopy);
+ protected static RestoreContext clearAndStoreContext() {
+ RestoreContext restoreContext = new RestoreContext();
+ RpcContext.removeContext();
+ return restoreContext;
+ }
+
+ protected static RestoreContext storeContext() {
+ return new RestoreContext();
}
protected static void restoreContext(RestoreContext restoreContext) {
@@ -813,22 +819,6 @@ public class RpcContext {
}
}
- protected static void restoreClientAttachment(RpcContextAttachment oldContext) {
- CLIENT_ATTACHMENT.set(oldContext);
- }
-
- protected static void restoreServerContext(RpcContextAttachment oldServerContext) {
- SERVER_LOCAL.set(oldServerContext);
- }
-
- protected static void restoreServerAttachment(RpcContextAttachment oldServerContext) {
- SERVER_ATTACHMENT.set(oldServerContext);
- }
-
- protected static void restoreServiceContext(RpcServiceContext oldServiceContext) {
- SERVICE_CONTEXT.set(oldServiceContext);
- }
-
/**
* Used to temporarily store and restore all kinds of contexts of current thread.
*/
@@ -838,31 +828,31 @@ public class RpcContext {
private final RpcContextAttachment serverAttachment;
private final RpcContextAttachment serverLocal;
- public RestoreContext(boolean needCopy) {
- serviceContext = getServiceContext().copyOf(needCopy);
- clientAttachment = getClientAttachment().copyOf(needCopy);
- serverAttachment = getServerAttachment().copyOf(needCopy);
- serverLocal = getServerContext().copyOf(needCopy);
+ public RestoreContext() {
+ serviceContext = getServiceContext().copyOf(false);
+ clientAttachment = getClientAttachment().copyOf(false);
+ serverAttachment = getServerAttachment().copyOf(false);
+ serverLocal = getServerContext().copyOf(false);
}
public void restore() {
if (serviceContext != null) {
- restoreServiceContext(serviceContext);
+ SERVICE_CONTEXT.set(serviceContext);
} else {
removeServiceContext();
}
if (clientAttachment != null) {
- restoreClientAttachment(clientAttachment);
+ CLIENT_ATTACHMENT.set(clientAttachment);
} else {
removeClientAttachment();
}
if (serverAttachment != null) {
- restoreServerAttachment(serverAttachment);
+ SERVER_ATTACHMENT.set(serverAttachment);
} else {
removeServerAttachment();
}
if (serverLocal != null) {
- restoreServerContext(serverLocal);
+ SERVER_LOCAL.set(serverLocal);
} else {
removeServerContext();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
index e61632d..e59ae9b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
@@ -658,10 +658,6 @@ public class RpcServiceContext extends RpcContext {
* @return a shallow copy of RpcServiceContext
*/
public RpcServiceContext copyOf(boolean needCopy) {
- if (!isValid()) {
- return this;
- }
-
if (needCopy) {
RpcServiceContext copy = new RpcServiceContext();
copy.consumerUrl = this.consumerUrl;
@@ -675,13 +671,4 @@ public class RpcServiceContext extends RpcContext {
}
}
-
- private boolean isValid() {
- return this.consumerUrl != null
- || this.localAddress != null
- || this.remoteAddress != null
- || this.invocation != null
- || this.asyncContext != null;
- }
-
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index 3e38c67..3d6aabe 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -126,27 +126,29 @@ public class ContextFilter implements Filter, Filter.Listener {
((RpcInvocation) invocation).setInvoker(invoker);
}
- try {
- context.clearAfterEachInvoke(false);
- return invoker.invoke(invocation);
- } finally {
- context.clearAfterEachInvoke(true);
- RpcContext.removeServerAttachment();
- RpcContext.removeServiceContext();
- // IMPORTANT! For async scenario, context must be removed from current thread, so a new RpcContext is always created for the next invoke for the same thread.
- RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
- RpcContext.removeServerContext();
- }
+ context.clearAfterEachInvoke(false);
+
+ return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// pass attachments to result
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
+ removeContext();
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+ removeContext();
+ }
+
+ private void removeContext() {
+ RpcContext.getServerAttachment().clearAfterEachInvoke(true); // TODO, not necessary anymore
+ RpcContext.removeServerAttachment();
+ RpcContext.removeClientAttachment();
+ RpcContext.removeServiceContext();
+ RpcContext.removeServerContext();
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 336671a..a28528e 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -32,6 +32,8 @@ import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;
+
/**
* This Invoker works on provider side, delegates RPC to interface implementation.
*/
@@ -96,6 +98,9 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
}
return result;
});
+ if (RpcContext.getServiceContext().isAsyncStarted()) {
+ invocation.put(PROVIDER_ASYNC_KEY, Boolean.TRUE);
+ }
return new AsyncRpcResult(appResponseFuture, invocation);
} catch (InvocationTargetException e) {
if (RpcContext.getServiceContext().isAsyncStarted() && !RpcContext.getServiceContext().stopAsync()) {