You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/12/26 09:26:52 UTC

[dubbo] branch 3.0.5-release updated: try moving finally block (#9496)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0.5-release by this push:
     new 2759f38  try moving finally block (#9496)
2759f38 is described below

commit 2759f386b1c91f284d2afbb478b6a1943885ce65
Author: ken.lj <ke...@gmail.com>
AuthorDate: Sun Dec 26 17:26:13 2021 +0800

    try moving finally block (#9496)
---
 .../filter/support/ConsumerContextFilter.java      | 37 ++++++++++--------
 .../dubbo/common/constants/CommonConstants.java    |  2 +
 .../org/apache/dubbo/rpc/AsyncContextImpl.java     |  2 +-
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  | 17 +++++----
 .../main/java/org/apache/dubbo/rpc/RpcContext.java | 44 +++++++++-------------
 .../org/apache/dubbo/rpc/RpcServiceContext.java    | 13 -------
 .../org/apache/dubbo/rpc/filter/ContextFilter.java | 24 ++++++------
 .../dubbo/rpc/proxy/AbstractProxyInvoker.java      |  8 +++-
 .../apache/dubbo/rpc/filter/ContextFilterTest.java |  6 +--
 9 files changed, 72 insertions(+), 81 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
index 846a740..79eb7b8 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/support/ConsumerContextFilter.java
@@ -93,35 +93,40 @@ public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Liste
             ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
         }
 
-        try {
-            // pass default timeout set by end user (ReferenceConfig)
-            Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
-            if (countDown != null) {
-                TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
-                if (timeoutCountDown.isExpired()) {
-                    return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
-                            "No time left for making the following call: " + invocation.getServiceName() + "."
-                                    + invocation.getMethodName() + ", terminate directly."), invocation);
-                }
+        // pass default timeout set by end user (ReferenceConfig)
+        Object countDown = context.getObjectAttachment(TIME_COUNTDOWN_KEY);
+        if (countDown != null) {
+            TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
+            if (timeoutCountDown.isExpired()) {
+                return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
+                    "No time left for making the following call: " + invocation.getServiceName() + "."
+                        + invocation.getMethodName() + ", terminate directly."), invocation);
             }
-
-            RpcContext.removeServerContext();
-            return invoker.invoke(invocation);
-        } finally {
-            RpcContext.removeServiceContext();
-            RpcContext.removeClientAttachment();
         }
+
+        RpcContext.removeServerContext();
+        return invoker.invoke(invocation);
     }
 
     @Override
     public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
         // pass attachments to result
         RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
+
+        removeContext();
     }
 
     @Override
     public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+        removeContext();
+    }
 
+    private void removeContext() {
+        RpcContext.removeServiceContext();
+        RpcContext.removeClientAttachment();
+        // server context must not be removed because user might use it on callback.
+        // So the clear of is delayed til the start of the next rpc call, see RpcContext.removeServerContext(); in invoke() above
+        // RpcContext.removeServerContext();
     }
 
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index 7b9de27..be5e20d 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -508,4 +508,6 @@ public interface CommonConstants {
 
     String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER";
 
+    String PROVIDER_ASYNC_KEY = "PROVIDER_ASYNC";
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
index 9f9a083..4ff3368 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
@@ -31,7 +31,7 @@ public class AsyncContextImpl implements AsyncContext {
     private ClassLoader stagedClassLoader;
 
     public AsyncContextImpl() {
-        restoreContext = RpcContext.storeContext(false);
+        restoreContext = RpcContext.storeContext();
         restoreClassLoader = Thread.currentThread().getContextClassLoader();
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 80a46da..377432a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;
 import static org.apache.dubbo.common.utils.ReflectUtils.defaultReturn;
 
 /**
@@ -59,6 +60,7 @@ public class AsyncRpcResult implements Result {
     private Executor executor;
 
     private Invocation invocation;
+    private final boolean async;
 
     private CompletableFuture<AppResponse> responseFuture;
 
@@ -66,10 +68,11 @@ public class AsyncRpcResult implements Result {
         this.responseFuture = future;
         this.invocation = invocation;
         RpcInvocation rpcInvocation = (RpcInvocation) invocation;
-        if (InvokeMode.SYNC != rpcInvocation.getInvokeMode() && !future.isDone()) {
-            this.storedContext = RpcContext.storeContext(false);
+        if ((rpcInvocation.get(PROVIDER_ASYNC_KEY) != null || InvokeMode.SYNC != rpcInvocation.getInvokeMode()) && !future.isDone()) {
+            async = true;
+            this.storedContext = RpcContext.clearAndStoreContext();
         } else {
-            this.storedContext = RpcContext.storeContext(true);
+            async = false;
         }
     }
 
@@ -198,12 +201,10 @@ public class AsyncRpcResult implements Result {
 
     public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
         this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
-            RpcContext.RestoreContext tmpContext = RpcContext.storeContext(false);
-            RpcContext.restoreContext(storedContext);
-
+            if (async) {
+                RpcContext.restoreContext(storedContext);
+            }
             fn.accept(v, t);
-
-            RpcContext.restoreContext(tmpContext);
         });
 
         // Necessary! update future in context, see https://github.com/apache/dubbo/issues/9461
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 1db82f1..69e986e 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -803,8 +803,14 @@ public class RpcContext {
         RpcServiceContext.setRpcContext(url);
     }
 
-    protected static RestoreContext storeContext(boolean needCopy) {
-        return new RestoreContext(needCopy);
+    protected static RestoreContext clearAndStoreContext() {
+        RestoreContext restoreContext = new RestoreContext();
+        RpcContext.removeContext();
+        return restoreContext;
+    }
+
+    protected static RestoreContext storeContext() {
+        return new RestoreContext();
     }
 
     protected static void restoreContext(RestoreContext restoreContext) {
@@ -813,22 +819,6 @@ public class RpcContext {
         }
     }
 
-    protected static void restoreClientAttachment(RpcContextAttachment oldContext) {
-        CLIENT_ATTACHMENT.set(oldContext);
-    }
-
-    protected static void restoreServerContext(RpcContextAttachment oldServerContext) {
-        SERVER_LOCAL.set(oldServerContext);
-    }
-
-    protected static void restoreServerAttachment(RpcContextAttachment oldServerContext) {
-        SERVER_ATTACHMENT.set(oldServerContext);
-    }
-
-    protected static void restoreServiceContext(RpcServiceContext oldServiceContext) {
-        SERVICE_CONTEXT.set(oldServiceContext);
-    }
-
     /**
      * Used to temporarily store and restore all kinds of contexts of current thread.
      */
@@ -838,31 +828,31 @@ public class RpcContext {
         private final RpcContextAttachment serverAttachment;
         private final RpcContextAttachment serverLocal;
 
-        public RestoreContext(boolean needCopy) {
-            serviceContext = getServiceContext().copyOf(needCopy);
-            clientAttachment = getClientAttachment().copyOf(needCopy);
-            serverAttachment = getServerAttachment().copyOf(needCopy);
-            serverLocal = getServerContext().copyOf(needCopy);
+        public RestoreContext() {
+            serviceContext = getServiceContext().copyOf(false);
+            clientAttachment = getClientAttachment().copyOf(false);
+            serverAttachment = getServerAttachment().copyOf(false);
+            serverLocal = getServerContext().copyOf(false);
         }
 
         public void restore() {
             if (serviceContext != null) {
-                restoreServiceContext(serviceContext);
+                SERVICE_CONTEXT.set(serviceContext);
             } else {
                 removeServiceContext();
             }
             if (clientAttachment != null) {
-                restoreClientAttachment(clientAttachment);
+                CLIENT_ATTACHMENT.set(clientAttachment);
             } else {
                 removeClientAttachment();
             }
             if (serverAttachment != null) {
-                restoreServerAttachment(serverAttachment);
+                SERVER_ATTACHMENT.set(serverAttachment);
             } else {
                 removeServerAttachment();
             }
             if (serverLocal != null) {
-                restoreServerContext(serverLocal);
+                SERVER_LOCAL.set(serverLocal);
             } else {
                 removeServerContext();
             }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
index e61632d..e59ae9b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
@@ -658,10 +658,6 @@ public class RpcServiceContext extends RpcContext {
      * @return a shallow copy of RpcServiceContext
      */
     public RpcServiceContext copyOf(boolean needCopy) {
-        if (!isValid()) {
-            return this;
-        }
-
         if (needCopy) {
             RpcServiceContext copy = new RpcServiceContext();
             copy.consumerUrl = this.consumerUrl;
@@ -675,13 +671,4 @@ public class RpcServiceContext extends RpcContext {
         }
     }
 
-
-    private boolean isValid() {
-        return this.consumerUrl != null
-            || this.localAddress != null
-            || this.remoteAddress != null
-            || this.invocation != null
-            || this.asyncContext != null;
-    }
-
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index 3e38c67..3d6aabe 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -126,27 +126,29 @@ public class ContextFilter implements Filter, Filter.Listener {
             ((RpcInvocation) invocation).setInvoker(invoker);
         }
 
-        try {
-            context.clearAfterEachInvoke(false);
-            return invoker.invoke(invocation);
-        } finally {
-            context.clearAfterEachInvoke(true);
-            RpcContext.removeServerAttachment();
-            RpcContext.removeServiceContext();
-            // IMPORTANT! For async scenario, context must be removed from current thread, so a new RpcContext is always created for the next invoke for the same thread.
-            RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
-            RpcContext.removeServerContext();
-        }
+        context.clearAfterEachInvoke(false);
+
+        return invoker.invoke(invocation);
     }
 
     @Override
     public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
         // pass attachments to result
         appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
+        removeContext();
     }
 
     @Override
     public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+        removeContext();
+    }
+
+    private void removeContext() {
+        RpcContext.getServerAttachment().clearAfterEachInvoke(true); // TODO, not necessary anymore
 
+        RpcContext.removeServerAttachment();
+        RpcContext.removeClientAttachment();
+        RpcContext.removeServiceContext();
+        RpcContext.removeServerContext();
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 336671a..84dfd33 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -32,6 +32,8 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
+import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_ASYNC_KEY;
+
 /**
  * This Invoker works on provider side, delegates RPC to interface implementation.
  */
@@ -82,7 +84,7 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
     public Result invoke(Invocation invocation) throws RpcException {
         try {
             Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
-            CompletableFuture<Object> future = wrapWithFuture(value);
+            CompletableFuture<Object> future = wrapWithFuture(value, invocation);
             CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                 AppResponse result = new AppResponse(invocation);
                 if (t != null) {
@@ -107,10 +109,12 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
         }
     }
 
-    private CompletableFuture<Object> wrapWithFuture(Object value) {
+    private CompletableFuture<Object> wrapWithFuture(Object value, Invocation invocation) {
         if (value instanceof CompletableFuture) {
+            invocation.put(PROVIDER_ASYNC_KEY, Boolean.TRUE);
             return (CompletableFuture<Object>) value;
         } else if (RpcContext.getServiceContext().isAsyncStarted()) {
+            invocation.put(PROVIDER_ASYNC_KEY, Boolean.TRUE);
             return ((AsyncContextImpl) (RpcContext.getServiceContext().getAsyncContext())).getInternalFuture();
         }
         return CompletableFuture.completedFuture(value);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java
index 84b6a68..a37f87a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/ContextFilterTest.java
@@ -29,7 +29,7 @@ import org.apache.dubbo.rpc.support.MyInvoker;
 
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Mockito.mock;
 
@@ -62,7 +62,7 @@ public class ContextFilterTest {
         given(invoker.getUrl()).willReturn(url);
 
         contextFilter.invoke(invoker, invocation);
-        assertNull(RpcContext.getServiceContext().getInvoker());
+        assertNotNull(RpcContext.getServiceContext().getInvoker());
     }
 
     @Test
@@ -71,6 +71,6 @@ public class ContextFilterTest {
         Invoker<DemoService> invoker = new MyInvoker<DemoService>(url);
         Invocation invocation = new MockInvocation();
         Result result = contextFilter.invoke(invoker, invocation);
-        assertNull(RpcContext.getServiceContext().getInvoker());
+        assertNotNull(RpcContext.getServiceContext().getInvoker());
     }
 }