You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2021/12/25 12:25:08 UTC

[dubbo] branch 3.0.5-release updated: fix async call issues, rpc context and response future callback race conditions (#9464)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0.5-release
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0.5-release by this push:
     new 098787a  fix async call issues, rpc context and response future callback race conditions (#9464)
098787a is described below

commit 098787a4ef95c53f43f2130c90995cbf4c8e732f
Author: ken.lj <ke...@gmail.com>
AuthorDate: Sat Dec 25 20:24:22 2021 +0800

    fix async call issues, rpc context and response future callback race conditions (#9464)
    
    fixes #9461 #8602
---
 .../cluster/filter/DefaultFilterChainBuilder.java  |   6 +-
 .../rpc/cluster/filter/FilterChainBuilder.java     | 204 ++++++++++++++++++++-
 .../filter/DefaultFilterChainBuilderTest.java      |  12 +-
 .../support/FailoverClusterInvokerTest.java        |   2 +-
 .../support/MergeableClusterInvokerTest.java       |   7 +-
 .../support/wrapper/AbstractClusterTest.java       |   4 +-
 .../java/org/apache/dubbo/common/Experimental.java |   2 +-
 .../dubbo/common/constants/CommonConstants.java    |   5 +
 .../AbstractRegistryCenterExporterListener.java    |  36 +++-
 .../dubbo/auth/filter/ProviderAuthFilterTest.java  |  16 +-
 .../java/org/apache/dubbo/rpc/AsyncContext.java    |  46 +++++
 .../org/apache/dubbo/rpc/AsyncContextImpl.java     |  25 ++-
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  |  44 ++---
 .../main/java/org/apache/dubbo/rpc/BaseFilter.java |  23 +++
 .../org/apache/dubbo/rpc/ListenableFilter.java     |   3 +-
 .../main/java/org/apache/dubbo/rpc/RpcContext.java |  74 +++++++-
 .../org/apache/dubbo/rpc/RpcContextAttachment.java |  24 +++
 .../org/apache/dubbo/rpc/RpcServiceContext.java    |  41 ++++-
 ...rFilter.java => ClassLoaderCallbackFilter.java} |  33 +++-
 .../apache/dubbo/rpc/filter/ClassLoaderFilter.java |  32 +++-
 .../org/apache/dubbo/rpc/filter/ContextFilter.java |   2 +-
 .../dubbo/rpc/proxy/AbstractProxyInvoker.java      |   6 +-
 .../dubbo/internal/org.apache.dubbo.rpc.Filter     |   3 +-
 .../java/org/apache/dubbo/rpc/RpcContextTest.java  |   5 +
 .../rpc/filter/CompatibleFilterFilterTest.java     |  19 +-
 .../apache/dubbo/rpc/filter/EchoFilterTest.java    |   3 +-
 .../apache/dubbo/rpc/filter/TimeoutFilterTest.java |   3 +-
 .../apache/dubbo/rpc/support/MockInvocation.java   |   4 +-
 28 files changed, 576 insertions(+), 108 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
index ff4f9b8..15be300 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilder.java
@@ -67,8 +67,9 @@ public class DefaultFilterChainBuilder implements FilterChainBuilder {
             for (int i = filters.size() - 1; i >= 0; i--) {
                 final Filter filter = filters.get(i);
                 final Invoker<T> next = last;
-                last = new FilterChainNode<>(originalInvoker, next, filter);
+                last = new CopyOfFilterChainNode<>(originalInvoker, next, filter);
             }
+            return new CallbackRegistrationInvoker<>(last, filters);
         }
 
         return last;
@@ -103,8 +104,9 @@ public class DefaultFilterChainBuilder implements FilterChainBuilder {
             for (int i = filters.size() - 1; i >= 0; i--) {
                 final ClusterFilter filter = filters.get(i);
                 final Invoker<T> next = last;
-                last = new ClusterFilterChainNode<>(originalInvoker, next, filter);
+                last = new CopyOfClusterFilterChainNode<>(originalInvoker, next, filter);
             }
+            return new ClusterCallbackRegistrationInvoker<>(originalInvoker, last, filters);
         }
 
         return last;
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
index 7edab08..a329959 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/filter/FilterChainBuilder.java
@@ -16,8 +16,11 @@
  */
 package org.apache.dubbo.rpc.cluster.filter;
 
+import org.apache.dubbo.common.Experimental;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.rpc.BaseFilter;
 import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
@@ -28,6 +31,9 @@ import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.cluster.ClusterInvoker;
 import org.apache.dubbo.rpc.cluster.Directory;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import static org.apache.dubbo.common.extension.ExtensionScope.APPLICATION;
 
 @SPI(value = "default", scope = APPLICATION)
@@ -144,7 +150,7 @@ public interface FilterChainBuilder {
      * @param <TYPE>
      */
     class ClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
-            extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
+        extends FilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
         public ClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
             super(originalInvoker, nextNode, filter);
         }
@@ -165,4 +171,200 @@ public interface FilterChainBuilder {
             return getOriginalInvoker().isDestroyed();
         }
     }
+
+    class CallbackRegistrationInvoker<T, FILTER extends BaseFilter> implements Invoker<T> {
+        static final Logger LOGGER = LoggerFactory.getLogger(CallbackRegistrationInvoker.class);
+        final Invoker<T> filterInvoker;
+        final List<FILTER> filters;
+
+        public CallbackRegistrationInvoker(Invoker<T> filterInvoker, List<FILTER> filters) {
+            this.filterInvoker = filterInvoker;
+            this.filters = filters;
+        }
+
+        @Override
+        public Result invoke(Invocation invocation) throws RpcException {
+            Result asyncResult = filterInvoker.invoke(invocation);
+            asyncResult.whenCompleteWithContext((r, t) -> {
+                for (int i = filters.size() - 1; i >= 0; i--) {
+                    FILTER filter = filters.get(i);
+                    try {
+                        if (filter instanceof ListenableFilter) {
+                            ListenableFilter listenableFilter = ((ListenableFilter) filter);
+                            Filter.Listener listener = listenableFilter.listener(invocation);
+                            try {
+                                if (listener != null) {
+                                    if (t == null) {
+                                        listener.onResponse(r, filterInvoker, invocation);
+                                    } else {
+                                        listener.onError(t, filterInvoker, invocation);
+                                    }
+                                }
+                            } finally {
+                                listenableFilter.removeListener(invocation);
+                            }
+                        } else if (filter instanceof FILTER.Listener) {
+                            FILTER.Listener listener = (FILTER.Listener) filter;
+                            if (t == null) {
+                                listener.onResponse(r, filterInvoker, invocation);
+                            } else {
+                                listener.onError(t, filterInvoker, invocation);
+                            }
+                        }
+                    } catch (Throwable filterThrowable) {
+                        LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));
+                        if (LOGGER.isDebugEnabled()) {
+                            LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));
+                        }
+                        throw filterThrowable;
+                    }
+                }
+            });
+
+            return asyncResult;
+        }
+
+        @Override
+        public Class<T> getInterface() {
+            return filterInvoker.getInterface();
+        }
+
+        @Override
+        public URL getUrl() {
+            return filterInvoker.getUrl();
+        }
+
+        @Override
+        public boolean isAvailable() {
+            return filterInvoker.isAvailable();
+        }
+
+        @Override
+        public void destroy() {
+            filterInvoker.destroy();
+        }
+    }
+
+    class ClusterCallbackRegistrationInvoker<T, FILTER extends BaseFilter> extends CallbackRegistrationInvoker<T, FILTER>
+        implements ClusterInvoker<T> {
+        private ClusterInvoker<T> originalInvoker;
+
+        public ClusterCallbackRegistrationInvoker(ClusterInvoker<T> originalInvoker, Invoker<T> filterInvoker, List<FILTER> filters) {
+            super(filterInvoker, filters);
+            this.originalInvoker = originalInvoker;
+        }
+
+        public ClusterInvoker<T> getOriginalInvoker() {
+            return originalInvoker;
+        }
+
+        @Override
+        public URL getRegistryUrl() {
+            return getOriginalInvoker().getRegistryUrl();
+        }
+
+        @Override
+        public Directory<T> getDirectory() {
+            return getOriginalInvoker().getDirectory();
+        }
+
+        @Override
+        public boolean isDestroyed() {
+            return getOriginalInvoker().isDestroyed();
+        }
+    }
+
+
+    @Experimental("Works for the same purpose as FilterChainNode, replace FilterChainNode with this one when proved stable enough")
+    class CopyOfFilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T> {
+        TYPE originalInvoker;
+        Invoker<T> nextNode;
+        FILTER filter;
+
+        public CopyOfFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
+            this.originalInvoker = originalInvoker;
+            this.nextNode = nextNode;
+            this.filter = filter;
+        }
+
+        public TYPE getOriginalInvoker() {
+            return originalInvoker;
+        }
+
+        @Override
+        public Class<T> getInterface() {
+            return originalInvoker.getInterface();
+        }
+
+        @Override
+        public URL getUrl() {
+            return originalInvoker.getUrl();
+        }
+
+        @Override
+        public boolean isAvailable() {
+            return originalInvoker.isAvailable();
+        }
+
+        @Override
+        public Result invoke(Invocation invocation) throws RpcException {
+            Result asyncResult;
+            try {
+                asyncResult = filter.invoke(nextNode, invocation);
+            } catch (Exception e) {
+                if (filter instanceof ListenableFilter) {
+                    ListenableFilter listenableFilter = ((ListenableFilter) filter);
+                    try {
+                        Filter.Listener listener = listenableFilter.listener(invocation);
+                        if (listener != null) {
+                            listener.onError(e, originalInvoker, invocation);
+                        }
+                    } finally {
+                        listenableFilter.removeListener(invocation);
+                    }
+                } else if (filter instanceof FILTER.Listener) {
+                    FILTER.Listener listener = (FILTER.Listener) filter;
+                    listener.onError(e, originalInvoker, invocation);
+                }
+                throw e;
+            } finally {
+
+            }
+            return asyncResult;
+        }
+
+        @Override
+        public void destroy() {
+            originalInvoker.destroy();
+        }
+
+        @Override
+        public String toString() {
+            return originalInvoker.toString();
+        }
+    }
+
+    @Experimental("Works for the same purpose as ClusterFilterChainNode, replace ClusterFilterChainNode with this one when proved stable enough")
+    class CopyOfClusterFilterChainNode<T, TYPE extends ClusterInvoker<T>, FILTER extends BaseFilter>
+        extends CopyOfFilterChainNode<T, TYPE, FILTER> implements ClusterInvoker<T> {
+        public CopyOfClusterFilterChainNode(TYPE originalInvoker, Invoker<T> nextNode, FILTER filter) {
+            super(originalInvoker, nextNode, filter);
+        }
+
+
+        @Override
+        public URL getRegistryUrl() {
+            return getOriginalInvoker().getRegistryUrl();
+        }
+
+        @Override
+        public Directory<T> getDirectory() {
+            return getOriginalInvoker().getDirectory();
+        }
+
+        @Override
+        public boolean isDestroyed() {
+            return getOriginalInvoker().isDestroyed();
+        }
+    }
 }
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java
index 8a579b8..7a5779a 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/filter/DefaultFilterChainBuilderTest.java
@@ -19,16 +19,17 @@ package org.apache.dubbo.rpc.cluster.filter;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.REFERENCE_FILTER_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
 
 public class DefaultFilterChainBuilderTest {
 
@@ -62,8 +63,8 @@ public class DefaultFilterChainBuilderTest {
             }
         };
         invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER);
-        Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode);
-        Assertions.assertTrue(((FilterChainBuilder.FilterChainNode<?, ?, ?>) invokerAfterBuild).filter instanceof LogFilter);
+        Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker);
+        Assertions.assertEquals(1, ((FilterChainBuilder.CallbackRegistrationInvoker<?, ?>) invokerAfterBuild).filters.size());
 
     }
 
@@ -97,8 +98,7 @@ public class DefaultFilterChainBuilderTest {
             }
         };
         invokerAfterBuild = defaultFilterChainBuilder.buildInvokerChain(invokerWithFilter, REFERENCE_FILTER_KEY, CONSUMER);
-        Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.FilterChainNode);
-        Assertions.assertTrue(((FilterChainBuilder.FilterChainNode<?, ?, ?>) invokerAfterBuild).filter instanceof LogFilter);
+        Assertions.assertTrue(invokerAfterBuild instanceof FilterChainBuilder.CallbackRegistrationInvoker);
 
     }
 }
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
index 4b9fba5..1fd4075 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java
@@ -295,7 +295,7 @@ public class FailoverClusterInvokerTest {
             }
             invokers.clear();
             MockInvoker<Demo> invoker3 = new MockInvoker<>(Demo.class, url);
-            invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(null));
+            invoker3.setResult(AsyncRpcResult.newDefaultAsyncResult(mock(RpcInvocation.class)));
             invokers.add(invoker3);
             dic.notify(invokers);
             return null;
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java
index bb2ff1b..b95d0ef 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/MergeableClusterInvokerTest.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.cluster.Directory;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.ModuleModel;
@@ -41,8 +42,6 @@ import java.util.Map;
 
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.rpc.Constants.MERGER_KEY;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -55,7 +54,7 @@ public class MergeableClusterInvokerTest {
     private Directory directory = mock(Directory.class);
     private Invoker firstInvoker = mock(Invoker.class);
     private Invoker secondInvoker = mock(Invoker.class);
-    private Invocation invocation = mock(Invocation.class);
+    private Invocation invocation = mock(RpcInvocation.class);
     private ModuleModel moduleModel = mock(ModuleModel.class);
 
     private MergeableClusterInvoker<MenuService> mergeableClusterInvoker;
@@ -98,7 +97,7 @@ public class MergeableClusterInvokerTest {
         directory = mock(Directory.class);
         firstInvoker = mock(Invoker.class);
         secondInvoker = mock(Invoker.class);
-        invocation = mock(Invocation.class);
+        invocation = mock(RpcInvocation.class);
 
     }
 
diff --git a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java
index 41eff47..6245b71 100644
--- a/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java
+++ b/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/wrapper/AbstractClusterTest.java
@@ -28,8 +28,8 @@ import org.apache.dubbo.rpc.cluster.LoadBalance;
 import org.apache.dubbo.rpc.cluster.filter.DemoService;
 import org.apache.dubbo.rpc.cluster.filter.FilterChainBuilder;
 import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;
-
 import org.apache.dubbo.rpc.model.ApplicationModel;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -69,7 +69,7 @@ public class AbstractClusterTest {
         Invoker<?> invoker = demoCluster.join(directory, true);
         Assertions.assertTrue(invoker instanceof AbstractCluster.ClusterFilterInvoker);
         Assertions.assertTrue(((AbstractCluster.ClusterFilterInvoker<?>) invoker).getFilterInvoker()
-            instanceof FilterChainBuilder.ClusterFilterChainNode);
+            instanceof FilterChainBuilder.ClusterCallbackRegistrationInvoker);
 
 
     }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java
index 64f4cd3..746bc1b 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Experimental.java
@@ -22,7 +22,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
 /**
- * Indicating unstable API, may get removed or changed in the next release.
+ * Indicating unstable API, may get removed or changed in future releases.
  */
 @Retention(RetentionPolicy.CLASS)
 @Target({
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index f564ea2..7b9de27 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -503,4 +503,9 @@ public interface CommonConstants {
     String ENABLE_CONNECTIVITY_VALIDATION = "dubbo.connectivity.validation";
 
     String DUBBO_INTERNAL_APPLICATION = "DUBBO_INTERNAL_APPLICATION";
+
+    String WORKING_CLASSLOADER_KEY = "WORKING_CLASSLOADER";
+
+    String STAGED_CLASSLOADER_KEY = "STAGED_CLASSLOADER";
+
 }
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java
index 69a3d5e..74e1e69 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/AbstractRegistryCenterExporterListener.java
@@ -26,9 +26,9 @@ import org.apache.dubbo.rpc.listener.ListenerExporterWrapper;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.HashSet;
 
 /**
  * The abstraction of {@link ExporterListener} is to record exported exporters, which should be extended by different sub-classes.
@@ -56,12 +56,13 @@ public abstract class AbstractRegistryCenterExporterListener implements Exporter
     @Override
     public void exported(Exporter<?> exporter) throws RpcException {
         ListenerExporterWrapper listenerExporterWrapper = (ListenerExporterWrapper) exporter;
-        FilterChainBuilder.FilterChainNode filterChainNode = (FilterChainBuilder.FilterChainNode) listenerExporterWrapper.getInvoker();
-        if (filterChainNode == null ||
-            filterChainNode.getInterface() != getInterface()) {
+        FilterChainBuilder.CallbackRegistrationInvoker callbackRegistrationInvoker = (FilterChainBuilder.CallbackRegistrationInvoker) listenerExporterWrapper.getInvoker();
+        if (callbackRegistrationInvoker == null ||
+            callbackRegistrationInvoker.getInterface() != getInterface()) {
             return;
         }
         exportedExporters.add(exporter);
+        FilterChainBuilder.CopyOfFilterChainNode filterChainNode = getFilterChainNode(callbackRegistrationInvoker);
         do {
             Filter filter = this.getFilter(filterChainNode);
             if (filter != null) {
@@ -96,7 +97,24 @@ public abstract class AbstractRegistryCenterExporterListener implements Exporter
     /**
      * Use reflection to obtain {@link Filter}
      */
-    private Filter getFilter(FilterChainBuilder.FilterChainNode filterChainNode) {
+    private FilterChainBuilder.CopyOfFilterChainNode getFilterChainNode(FilterChainBuilder.CallbackRegistrationInvoker callbackRegistrationInvoker) {
+        if (callbackRegistrationInvoker != null) {
+            Field field = null;
+            try {
+                field = callbackRegistrationInvoker.getClass().getDeclaredField("filterInvoker");
+                field.setAccessible(true);
+                return (FilterChainBuilder.CopyOfFilterChainNode) field.get(callbackRegistrationInvoker);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                // ignore
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Use reflection to obtain {@link Filter}
+     */
+    private Filter getFilter(FilterChainBuilder.CopyOfFilterChainNode filterChainNode) {
         if (filterChainNode != null) {
             Field field = null;
             try {
@@ -111,17 +129,17 @@ public abstract class AbstractRegistryCenterExporterListener implements Exporter
     }
 
     /**
-     * Use reflection to obtain {@link FilterChainBuilder.FilterChainNode}
+     * Use reflection to obtain {@link FilterChainBuilder.CopyOfFilterChainNode}
      */
-    private FilterChainBuilder.FilterChainNode getNextNode(FilterChainBuilder.FilterChainNode filterChainNode) {
+    private FilterChainBuilder.CopyOfFilterChainNode getNextNode(FilterChainBuilder.CopyOfFilterChainNode filterChainNode) {
         if (filterChainNode != null) {
             Field field = null;
             try {
                 field = filterChainNode.getClass().getDeclaredField("nextNode");
                 field.setAccessible(true);
                 Object object = field.get(filterChainNode);
-                if (object instanceof FilterChainBuilder.FilterChainNode) {
-                    return (FilterChainBuilder.FilterChainNode) object;
+                if (object instanceof FilterChainBuilder.CopyOfFilterChainNode) {
+                    return (FilterChainBuilder.CopyOfFilterChainNode) object;
                 }
             } catch (NoSuchFieldException | IllegalAccessException e) {
                 // ignore
diff --git a/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java b/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java
index aac6287..6e12f6f 100644
--- a/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java
+++ b/dubbo-plugin/dubbo-auth/src/test/java/org/apache/dubbo/auth/filter/ProviderAuthFilterTest.java
@@ -24,7 +24,9 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ApplicationModel;
+
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -43,7 +45,7 @@ class ProviderAuthFilterTest {
     void testAuthDisabled() {
         URL url = mock(URL.class);
         Invoker invoker = mock(Invoker.class);
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         when(invoker.getUrl()).thenReturn(url);
         ProviderAuthFilter providerAuthFilter = new ProviderAuthFilter(ApplicationModel.defaultModel());
         providerAuthFilter.invoke(invoker, invocation);
@@ -58,7 +60,7 @@ class ProviderAuthFilterTest {
                 .addParameter(CommonConstants.APPLICATION_KEY, "test")
                 .addParameter(Constants.SERVICE_AUTH, true);
         Invoker invoker = mock(Invoker.class);
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         when(invoker.getUrl()).thenReturn(url);
         ProviderAuthFilter providerAuthFilter = new ProviderAuthFilter(ApplicationModel.defaultModel());
         providerAuthFilter.invoke(invoker, invocation);
@@ -74,7 +76,7 @@ class ProviderAuthFilterTest {
                 .addParameter(CommonConstants.APPLICATION_KEY, "test")
                 .addParameter(Constants.SERVICE_AUTH, true);
         Invoker invoker = mock(Invoker.class);
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         when(invocation.getAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn(null);
         when(invoker.getUrl()).thenReturn(url);
 
@@ -92,7 +94,7 @@ class ProviderAuthFilterTest {
                 .addParameter(CommonConstants.APPLICATION_KEY, "test")
                 .addParameter(Constants.SERVICE_AUTH, true);
         Invoker invoker = mock(Invoker.class);
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         when(invocation.getAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn(null);
         when(invoker.getUrl()).thenReturn(url);
 
@@ -107,7 +109,7 @@ class ProviderAuthFilterTest {
                 .addParameter(CommonConstants.APPLICATION_KEY, "test-provider")
                 .addParameter(Constants.SERVICE_AUTH, true);
         Invoker invoker = mock(Invoker.class);
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         when(invocation.getObjectAttachment(Constants.REQUEST_SIGNATURE_KEY)).thenReturn("dubbo");
         when(invocation.getObjectAttachment(Constants.AK_KEY)).thenReturn("ak");
         when(invocation.getObjectAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer");
@@ -135,7 +137,7 @@ class ProviderAuthFilterTest {
                 .addParameter(Constants.SERVICE_AUTH, true);
 
         Invoker invoker = mock(Invoker.class);
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         when(invocation.getObjectAttachment(Constants.AK_KEY)).thenReturn("ak");
         when(invocation.getObjectAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer");
         when(invocation.getObjectAttachment(Constants.REQUEST_TIMESTAMP_KEY)).thenReturn(currentTimeMillis);
@@ -168,7 +170,7 @@ class ProviderAuthFilterTest {
                 .addParameter(CommonConstants.APPLICATION_KEY, "test-provider")
                 .addParameter(Constants.SERVICE_AUTH, true);
         Invoker invoker = mock(Invoker.class);
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         when(invocation.getAttachment(Constants.AK_KEY)).thenReturn("ak");
         when(invocation.getAttachment(CommonConstants.CONSUMER)).thenReturn("test-consumer");
         when(invocation.getAttachment(Constants.REQUEST_TIMESTAMP_KEY)).thenReturn(String.valueOf(currentTimeMillis));
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
index 1f51a54..0635d29 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContext.java
@@ -74,4 +74,50 @@ public interface AsyncContext {
      * </code>
      */
     void signalContextSwitch();
+
+    /**
+     * Reset Context is not necessary. Only reset context after result was write back if it is necessary.
+     *
+     * <code>
+     * public class AsyncServiceImpl implements AsyncService {
+     * public String sayHello(String name) {
+     * final AsyncContext asyncContext = RpcContext.startAsync();
+     * new Thread(() -> {
+     * <p>
+     * // the right place to use this method
+     * asyncContext.signalContextSwitch();
+     * <p>
+     * try {
+     * Thread.sleep(500);
+     * } catch (InterruptedException e) {
+     * e.printStackTrace();
+     * }
+     * asyncContext.write("Hello " + name + ", response from provider.");
+     * // only reset after asyncContext.write()
+     * asyncContext.resetContext();
+     * }).start();
+     * return null;
+     * }
+     * }
+     * </code>
+     *
+     * <code>
+     * public class AsyncServiceImpl implements AsyncService {
+     * public CompletableFuture sayHello(String name) {
+     * CompletableFuture future = new CompletableFuture();
+     * final AsyncContext asyncContext = RpcContext.startAsync();
+     * new Thread(() -> {
+     * // the right place to use this method
+     * asyncContext.signalContextSwitch();
+     * // some operations...
+     * future.complete();
+     * // only reset after future.complete()
+     * asyncContext.resetContext();
+     * }).start();
+     * return future;
+     * }
+     * }
+     * </code>
+     */
+    void resetContext();
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
index 1331b21..9f9a083 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncContextImpl.java
@@ -26,12 +26,13 @@ public class AsyncContextImpl implements AsyncContext {
 
     private CompletableFuture<Object> future;
 
-    private RpcContextAttachment storedContext;
-    private RpcContextAttachment storedServerContext;
+    private final RpcContext.RestoreContext restoreContext;
+    private final ClassLoader restoreClassLoader;
+    private ClassLoader stagedClassLoader;
 
     public AsyncContextImpl() {
-        this.storedContext = RpcContext.getClientAttachment();
-        this.storedServerContext = RpcContext.getServerContext();
+        restoreContext = RpcContext.storeContext(false);
+        restoreClassLoader = Thread.currentThread().getContextClassLoader();
     }
 
     @Override
@@ -67,9 +68,19 @@ public class AsyncContextImpl implements AsyncContext {
 
     @Override
     public void signalContextSwitch() {
-        RpcContext.restoreContext(storedContext);
-        RpcContext.restoreServerContext(storedServerContext);
-        // Restore any other contexts in here if necessary.
+        RpcContext.restoreContext(restoreContext);
+        if (restoreClassLoader != null) {
+            stagedClassLoader = Thread.currentThread().getContextClassLoader();
+            Thread.currentThread().setContextClassLoader(restoreClassLoader);
+        }
+    }
+
+    @Override
+    public void resetContext() {
+        RpcContext.removeContext();
+        if (stagedClassLoader != null) {
+            Thread.currentThread().setContextClassLoader(restoreClassLoader);
+        }
     }
 
     public CompletableFuture<Object> getInternalFuture() {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index 1accccd..80a46da 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.threadpool.ThreadlessExecutor;
 import org.apache.dubbo.rpc.model.ConsumerMethodModel;
+import org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -51,10 +52,10 @@ public class AsyncRpcResult implements Result {
 
     /**
      * RpcContext may already have been changed when callback happens, it happens when the same thread is used to execute another RPC call.
-     * So we should keep the reference of current RpcContext instance and restore it before callback being executed.
+     * So we should keep the copy of current RpcContext instance and restore it before callback being executed.
      */
-    private RpcContextAttachment storedContext;
-    private RpcContextAttachment storedServerContext;
+    private RpcContext.RestoreContext storedContext;
+
     private Executor executor;
 
     private Invocation invocation;
@@ -64,8 +65,12 @@ public class AsyncRpcResult implements Result {
     public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
         this.responseFuture = future;
         this.invocation = invocation;
-        this.storedContext = RpcContext.getClientAttachment();
-        this.storedServerContext = RpcContext.getServerContext();
+        RpcInvocation rpcInvocation = (RpcInvocation) invocation;
+        if (InvokeMode.SYNC != rpcInvocation.getInvokeMode() && !future.isDone()) {
+            this.storedContext = RpcContext.storeContext(false);
+        } else {
+            this.storedContext = RpcContext.storeContext(true);
+        }
     }
 
     /**
@@ -193,10 +198,17 @@ public class AsyncRpcResult implements Result {
 
     public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn) {
         this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
-            beforeContext.accept(v, t);
+            RpcContext.RestoreContext tmpContext = RpcContext.storeContext(false);
+            RpcContext.restoreContext(storedContext);
+
             fn.accept(v, t);
-            afterContext.accept(v, t);
+
+            RpcContext.restoreContext(tmpContext);
         });
+
+        // Necessary! update future in context, see https://github.com/apache/dubbo/issues/9461
+        RpcContext.getServiceContext().setFuture(new FutureAdapter<>(this.responseFuture));
+
         return this;
     }
 
@@ -281,24 +293,6 @@ public class AsyncRpcResult implements Result {
     }
 
     /**
-     * tmp context to use when the thread switch to Dubbo thread.
-     */
-    private RpcContextAttachment tmpContext;
-
-    private RpcContextAttachment tmpServerContext;
-    private BiConsumer<Result, Throwable> beforeContext = (appResponse, t) -> {
-        tmpContext = RpcContext.getClientAttachment();
-        tmpServerContext = RpcContext.getServerContext();
-        RpcContext.restoreContext(storedContext);
-        RpcContext.restoreServerContext(storedServerContext);
-    };
-
-    private BiConsumer<Result, Throwable> afterContext = (appResponse, t) -> {
-        RpcContext.restoreContext(tmpContext);
-        RpcContext.restoreServerContext(tmpServerContext);
-    };
-
-    /**
      * Some utility methods used to quickly generate default AsyncRpcResult instance.
      */
     public static AsyncRpcResult newDefaultAsyncResult(AppResponse appResponse, Invocation invocation) {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
index 594954f..02c643a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/BaseFilter.java
@@ -22,10 +22,33 @@ public interface BaseFilter {
      */
     Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
 
+    /**
+     * This callback listener applies to both synchronous and asynchronous calls, please put logics that need to be executed
+     * on return of rpc result in onResponse or onError respectively based on it is normal return or exception return.
+     * <p>
+     * There's something that needs to pay attention on legacy synchronous style filer refactor, the thing is, try to move logics
+     * previously defined in the 'finally block' to both onResponse and onError.
+     */
     interface Listener {
 
+        /**
+         * This method will only be called on successful remote rpc execution, that means, the service in on remote received
+         * the request and the result (normal or exceptional) returned successfully.
+         *
+         * @param appResponse, the rpc call result, it can represent both normal result and exceptional result
+         * @param invoker,     context
+         * @param invocation,  context
+         */
         void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
 
+        /**
+         * This method will be called on detection of framework exceptions, for example, TimeoutException, NetworkException
+         * Exception raised in Filters, etc.
+         *
+         * @param t,          framework exception
+         * @param invoker,    context
+         * @param invocation, context
+         */
         void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java
index 160ddcf..b590bfb 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ListenableFilter.java
@@ -23,9 +23,10 @@ import java.util.concurrent.ConcurrentMap;
  * It's recommended to implement Filter.Listener directly for callback registration, check the default implementation,
  * see {@link org.apache.dubbo.rpc.filter.ExceptionFilter}, for example.
  * <p>
- * If you do not want to share Listener instance between RPC calls. You can use ListenableFilter
+ * If you do not want to share Listener instance between RPC calls. ListenableFilter can be used
  * to keep a 'one Listener each RPC call' model.
  */
+@Deprecated
 public abstract class ListenableFilter implements Filter {
 
     protected Listener listener = null;
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 13f108b..1db82f1 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -122,10 +122,6 @@ public class RpcContext {
         return SERVER_LOCAL.get();
     }
 
-    public static void restoreServerContext(RpcContextAttachment oldServerContext) {
-        SERVER_LOCAL.set(oldServerContext);
-    }
-
     /**
      * remove server side context.
      *
@@ -198,10 +194,6 @@ public class RpcContext {
         this.remove = remove;
     }
 
-    public static void restoreContext(RpcContextAttachment oldContext) {
-        CLIENT_ATTACHMENT.set(oldContext);
-    }
-
     /**
      * remove context.
      *
@@ -810,4 +802,70 @@ public class RpcContext {
     public static void setRpcContext(URL url) {
         RpcServiceContext.setRpcContext(url);
     }
+
+    protected static RestoreContext storeContext(boolean needCopy) {
+        return new RestoreContext(needCopy);
+    }
+
+    protected static void restoreContext(RestoreContext restoreContext) {
+        if (restoreContext != null) {
+            restoreContext.restore();
+        }
+    }
+
+    protected static void restoreClientAttachment(RpcContextAttachment oldContext) {
+        CLIENT_ATTACHMENT.set(oldContext);
+    }
+
+    protected static void restoreServerContext(RpcContextAttachment oldServerContext) {
+        SERVER_LOCAL.set(oldServerContext);
+    }
+
+    protected static void restoreServerAttachment(RpcContextAttachment oldServerContext) {
+        SERVER_ATTACHMENT.set(oldServerContext);
+    }
+
+    protected static void restoreServiceContext(RpcServiceContext oldServiceContext) {
+        SERVICE_CONTEXT.set(oldServiceContext);
+    }
+
+    /**
+     * Used to temporarily store and restore all kinds of contexts of current thread.
+     */
+    public static class RestoreContext {
+        private final RpcServiceContext serviceContext;
+        private final RpcContextAttachment clientAttachment;
+        private final RpcContextAttachment serverAttachment;
+        private final RpcContextAttachment serverLocal;
+
+        public RestoreContext(boolean needCopy) {
+            serviceContext = getServiceContext().copyOf(needCopy);
+            clientAttachment = getClientAttachment().copyOf(needCopy);
+            serverAttachment = getServerAttachment().copyOf(needCopy);
+            serverLocal = getServerContext().copyOf(needCopy);
+        }
+
+        public void restore() {
+            if (serviceContext != null) {
+                restoreServiceContext(serviceContext);
+            } else {
+                removeServiceContext();
+            }
+            if (clientAttachment != null) {
+                restoreClientAttachment(clientAttachment);
+            } else {
+                removeClientAttachment();
+            }
+            if (serverAttachment != null) {
+                restoreServerAttachment(serverAttachment);
+            } else {
+                removeServerAttachment();
+            }
+            if (serverLocal != null) {
+                restoreServerContext(serverLocal);
+            } else {
+                removeServerContext();
+            }
+        }
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java
index ddbd981..aec6943 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContextAttachment.java
@@ -201,4 +201,28 @@ public class RpcContextAttachment extends RpcContext{
         return getAttachment(key);
     }
 
+    /**
+     * Also see {@link RpcServiceContext#copyOf(boolean)}
+     *
+     * @return a copy of RpcContextAttachment with deep copied attachments
+     */
+    public RpcContextAttachment copyOf(boolean needCopy) {
+        if (!isValid()) {
+            return null;
+        }
+
+        if (needCopy) {
+            RpcContextAttachment copy = new RpcContextAttachment();
+            if (CollectionUtils.isNotEmptyMap(attachments)) {
+                copy.attachments.putAll(this.attachments);
+            }
+            return copy;
+        } else {
+            return this;
+        }
+    }
+
+    private boolean isValid() {
+        return CollectionUtils.isNotEmptyMap(attachments);
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
index ebec31f..e61632d 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
@@ -41,6 +41,9 @@ public class RpcServiceContext extends RpcContext {
     protected RpcServiceContext() {
     }
 
+    // RPC service context updated before each service call.
+    private URL consumerUrl;
+
     private List<URL> urls;
 
     private URL url;
@@ -584,9 +587,6 @@ public class RpcServiceContext extends RpcContext {
         return asyncContext;
     }
 
-    // RPC service context updated before each service call.
-    private URL consumerUrl;
-
     @Override
     public String getGroup() {
         if (consumerUrl == null) {
@@ -649,4 +649,39 @@ public class RpcServiceContext extends RpcContext {
         RpcServiceContext rpcContext = RpcContext.getServiceContext();
         rpcContext.setConsumerUrl(url);
     }
+
+    /**
+     * Only part of the properties are copied, the others are either not used currently or can be got from invocation.
+     * Also see {@link RpcContextAttachment#copyOf(boolean)}
+     *
+     * @param needCopy
+     * @return a shallow copy of RpcServiceContext
+     */
+    public RpcServiceContext copyOf(boolean needCopy) {
+        if (!isValid()) {
+            return this;
+        }
+
+        if (needCopy) {
+            RpcServiceContext copy = new RpcServiceContext();
+            copy.consumerUrl = this.consumerUrl;
+            copy.localAddress = this.localAddress;
+            copy.remoteAddress = this.remoteAddress;
+            copy.invocation = this.invocation;
+            copy.asyncContext = this.asyncContext;
+            return copy;
+        } else {
+            return this;
+        }
+    }
+
+
+    private boolean isValid() {
+        return this.consumerUrl != null
+            || this.localAddress != null
+            || this.remoteAddress != null
+            || this.invocation != null
+            || this.asyncContext != null;
+    }
+
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java
similarity index 56%
copy from dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
copy to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java
index c71e2d1..a9b1269 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderCallbackFilter.java
@@ -18,27 +18,40 @@ package org.apache.dubbo.rpc.filter;
 
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.BaseFilter;
 import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 
+import static org.apache.dubbo.common.constants.CommonConstants.WORKING_CLASSLOADER_KEY;
+
 /**
- * Set the current execution thread class loader to service interface's class loader.
+ * Switch thread context class loader on filter callback.
  */
-@Activate(group = CommonConstants.PROVIDER, order = -30000)
-public class ClassLoaderFilter implements Filter {
+@Activate(group = CommonConstants.PROVIDER, order = Integer.MAX_VALUE)
+public class ClassLoaderCallbackFilter implements Filter, BaseFilter.Listener {
 
     @Override
     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
-        ClassLoader ocl = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
-        try {
-            return invoker.invoke(invocation);
-        } finally {
-            Thread.currentThread().setContextClassLoader(ocl);
-        }
+        return invoker.invoke(invocation);
+    }
+
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+        setClassLoader(invoker, invocation);
     }
 
+    @Override
+    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+        setClassLoader(invoker, invocation);
+    }
+
+    private void setClassLoader(Invoker<?> invoker, Invocation invocation) {
+        ClassLoader workingClassLoader = (ClassLoader) invocation.get(WORKING_CLASSLOADER_KEY);
+        if (workingClassLoader != null) {
+            Thread.currentThread().setContextClassLoader(workingClassLoader);
+        }
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
index c71e2d1..6a14fe1 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ClassLoaderFilter.java
@@ -18,27 +18,51 @@ package org.apache.dubbo.rpc.filter;
 
 import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.BaseFilter;
 import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcException;
 
+import static org.apache.dubbo.common.constants.CommonConstants.STAGED_CLASSLOADER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.WORKING_CLASSLOADER_KEY;
+
 /**
  * Set the current execution thread class loader to service interface's class loader.
  */
 @Activate(group = CommonConstants.PROVIDER, order = -30000)
-public class ClassLoaderFilter implements Filter {
+public class ClassLoaderFilter implements Filter, BaseFilter.Listener {
 
     @Override
     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
-        ClassLoader ocl = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
+        ClassLoader stagedClassLoader = Thread.currentThread().getContextClassLoader();
+        ClassLoader effectiveClassLoader = invoker.getInterface().getClassLoader();
+        invocation.put(STAGED_CLASSLOADER_KEY, stagedClassLoader);
+        invocation.put(WORKING_CLASSLOADER_KEY, effectiveClassLoader);
+
+        Thread.currentThread().setContextClassLoader(effectiveClassLoader);
         try {
             return invoker.invoke(invocation);
         } finally {
-            Thread.currentThread().setContextClassLoader(ocl);
+            Thread.currentThread().setContextClassLoader(stagedClassLoader);
         }
     }
 
+    @Override
+    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
+        resetClassLoader(invoker, invocation);
+    }
+
+    @Override
+    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
+        resetClassLoader(invoker, invocation);
+    }
+
+    private void resetClassLoader(Invoker<?> invoker, Invocation invocation) {
+        ClassLoader stagedClassLoader = (ClassLoader) invocation.get(STAGED_CLASSLOADER_KEY);
+        if (stagedClassLoader != null) {
+            Thread.currentThread().setContextClassLoader(stagedClassLoader);
+        }
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index 08a1286..3e38c67 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -133,7 +133,7 @@ public class ContextFilter implements Filter, Filter.Listener {
             context.clearAfterEachInvoke(true);
             RpcContext.removeServerAttachment();
             RpcContext.removeServiceContext();
-            // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
+            // IMPORTANT! For async scenario, context must be removed from current thread, so a new RpcContext is always created for the next invoke for the same thread.
             RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
             RpcContext.removeServerContext();
         }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
index 2230f50..336671a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/AbstractProxyInvoker.java
@@ -108,10 +108,10 @@ public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
     }
 
     private CompletableFuture<Object> wrapWithFuture(Object value) {
-        if (RpcContext.getServiceContext().isAsyncStarted()) {
-            return ((AsyncContextImpl)(RpcContext.getServiceContext().getAsyncContext())).getInternalFuture();
-        } else if (value instanceof CompletableFuture) {
+        if (value instanceof CompletableFuture) {
             return (CompletableFuture<Object>) value;
+        } else if (RpcContext.getServiceContext().isAsyncStarted()) {
+            return ((AsyncContextImpl) (RpcContext.getServiceContext().getAsyncContext())).getInternalFuture();
         }
         return CompletableFuture.completedFuture(value);
     }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
index 7c526c2..34d56b4 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
@@ -4,10 +4,11 @@ genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
 token=org.apache.dubbo.rpc.filter.TokenFilter
 accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
 classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
+classloader-callback=org.apache.dubbo.rpc.filter.ClassLoaderCallbackFilter
 context=org.apache.dubbo.rpc.filter.ContextFilter
 exception=org.apache.dubbo.rpc.filter.ExceptionFilter
 executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
 deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
 compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
 timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
-tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
\ No newline at end of file
+tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
index 4f1e6e9..a7dbd8f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/RpcContextTest.java
@@ -202,4 +202,9 @@ public class RpcContextTest {
         rpcContext.setObjectAttachments(map);
         Assertions.assertEquals(map, rpcContext.getObjectAttachments());
     }
+
+    @Test
+    public void testRestore() {
+
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java
index 7750195..9a5152b 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/CompatibleFilterFilterTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.support.DemoService;
 import org.apache.dubbo.rpc.support.Type;
 
@@ -49,7 +50,7 @@ public class CompatibleFilterFilterTest {
 
     @Test
     public void testInvokerGeneric() {
-        invocation = mock(Invocation.class);
+        invocation = mock(RpcInvocation.class);
         given(invocation.getMethodName()).willReturn("$enumlength");
         given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Enum.class});
         given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -69,7 +70,7 @@ public class CompatibleFilterFilterTest {
 
     @Test
     public void testResultHasException() {
-        invocation = mock(Invocation.class);
+        invocation = mock(RpcInvocation.class);
         given(invocation.getMethodName()).willReturn("enumlength");
         given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Enum.class});
         given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -90,7 +91,7 @@ public class CompatibleFilterFilterTest {
 
     @Test
     public void testInvokerJsonPojoSerialization() throws Exception {
-        invocation = mock(Invocation.class);
+        invocation = mock(RpcInvocation.class);
         given(invocation.getMethodName()).willReturn("enumlength");
         given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Type[].class});
         given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -100,7 +101,8 @@ public class CompatibleFilterFilterTest {
         given(invoker.getInterface()).willReturn(DemoService.class);
         AppResponse result = new AppResponse();
         result.setValue("High");
-        given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(result, invocation));
+        AsyncRpcResult defaultAsyncResult = AsyncRpcResult.newDefaultAsyncResult(result, invocation);
+        given(invoker.invoke(invocation)).willReturn(defaultAsyncResult);
         URL url = URL.valueOf("test://test:11/test?group=dubbo&version=1.1&serialization=json");
         given(invoker.getUrl()).willReturn(url);
 
@@ -112,7 +114,7 @@ public class CompatibleFilterFilterTest {
 
     @Test
     public void testInvokerNonJsonEnumSerialization() throws Exception {
-        invocation = mock(Invocation.class);
+        invocation = mock(RpcInvocation.class);
         given(invocation.getMethodName()).willReturn("enumlength");
         given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Type[].class});
         given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -122,7 +124,8 @@ public class CompatibleFilterFilterTest {
         given(invoker.getInterface()).willReturn(DemoService.class);
         AppResponse result = new AppResponse();
         result.setValue("High");
-        given(invoker.invoke(invocation)).willReturn(AsyncRpcResult.newDefaultAsyncResult(result, invocation));
+        AsyncRpcResult defaultAsyncResult = AsyncRpcResult.newDefaultAsyncResult(result, invocation);
+        given(invoker.invoke(invocation)).willReturn(defaultAsyncResult);
         URL url = URL.valueOf("test://test:11/test?group=dubbo&version=1.1");
         given(invoker.getUrl()).willReturn(url);
 
@@ -134,7 +137,7 @@ public class CompatibleFilterFilterTest {
 
     @Test
     public void testInvokerNonJsonNonPojoSerialization() {
-        invocation = mock(Invocation.class);
+        invocation = mock(RpcInvocation.class);
         given(invocation.getMethodName()).willReturn("echo");
         given(invocation.getParameterTypes()).willReturn(new Class<?>[]{String.class});
         given(invocation.getArguments()).willReturn(new Object[]{"hello"});
@@ -154,7 +157,7 @@ public class CompatibleFilterFilterTest {
 
     @Test
     public void testInvokerNonJsonPojoSerialization() {
-        invocation = mock(Invocation.class);
+        invocation = mock(RpcInvocation.class);
         given(invocation.getMethodName()).willReturn("echo");
         given(invocation.getParameterTypes()).willReturn(new Class<?>[]{String.class});
         given(invocation.getArguments()).willReturn(new Object[]{"hello"});
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java
index 76ee024..b31d225 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/EchoFilterTest.java
@@ -22,6 +22,7 @@ import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.support.DemoService;
 
 import org.junit.jupiter.api.Test;
@@ -37,7 +38,7 @@ public class EchoFilterTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testEcho() {
-        Invocation invocation = mock(Invocation.class);
+        Invocation invocation = mock(RpcInvocation.class);
         given(invocation.getMethodName()).willReturn("$echo");
         given(invocation.getParameterTypes()).willReturn(new Class<?>[]{Enum.class});
         given(invocation.getArguments()).willReturn(new Object[]{"hello"});
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java
index be85365..2564273 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/TimeoutFilterTest.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.support.BlockMyInvoker;
 
 import org.junit.jupiter.api.Assertions;
@@ -56,7 +57,7 @@ public class TimeoutFilterTest {
         URL url = URL.valueOf("test://test:11/test?accesslog=true&group=dubbo&version=1.1&timeout=" + timeout);
         Invoker invoker = new BlockMyInvoker(url, (timeout + 100));
 
-        Invocation invocation = Mockito.mock(Invocation.class);
+        Invocation invocation = Mockito.mock(RpcInvocation.class);
         when(invocation.getMethodName()).thenReturn("testInvokeWithTimeout");
 
         Result result = timeoutFilter.invoke(invoker, invocation);
diff --git a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
index 2c8a618..ae8a69c 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/support/MockInvocation.java
@@ -17,8 +17,8 @@
 package org.apache.dubbo.rpc.support;
 
 import org.apache.dubbo.rpc.AttachmentsAdapter;
-import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.model.ServiceModel;
 
 import java.util.HashMap;
@@ -34,7 +34,7 @@ import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
 /**
  * MockInvocation.java
  */
-public class MockInvocation implements Invocation {
+public class MockInvocation extends RpcInvocation {
 
     private Map<String, Object> attachments;